This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to annotated tag org.apache.sling.event.dea-1.0.0 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event-dea.git
commit d715c2e84394f60948f81e8db14d483f519048e2 Author: Carsten Ziegeler <[email protected]> AuthorDate: Tue Aug 26 08:24:57 2014 +0000 SLING-3882 : Move the distributed event admin into a separate project git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea@1620527 13f79535-47bb-0310-9956-ffa450edef68 --- pom.xml | 148 +++++++ .../org/apache/sling/event/dea/DEAConstants.java | 47 +++ .../event/dea/impl/DistributedEventAdminImpl.java | 102 +++++ .../event/dea/impl/DistributedEventReceiver.java | 438 +++++++++++++++++++++ .../event/dea/impl/DistributedEventSender.java | 243 ++++++++++++ .../sling/event/dea/impl/ResourceHelper.java | 96 +++++ .../org/apache/sling/event/dea/package-info.java | 24 ++ .../dea/impl/DistributingEventHandlerTest.java | 182 +++++++++ 8 files changed, 1280 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..afeacbe --- /dev/null +++ b/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.sling</groupId> + <artifactId>sling</artifactId> + <version>19</version> + <relativePath>../../../parent/pom.xml</relativePath> + </parent> + + <artifactId>org.apache.sling.event.dea</artifactId> + <packaging>bundle</packaging> + <version>0.0.1-SNAPSHOT</version> + + <name>Apache Sling Distributed Event Admin</name> + <description> + Support distributing events through the OSGi event admin. + </description> + + <scm> + <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</connection> + <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</developerConnection> + <url>http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/dea</url> + </scm> + + <properties> + <sling.java.version>6</sling.java.version> + </properties> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-scr-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + </plugin> + </plugins> + </build> + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <excludePackageNames> + org.apache.sling.event.dea.impl + </excludePackageNames> + </configuration> + </plugin> + </plugins> + </reporting> + <dependencies> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.scr.annotations</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.discovery.api</artifactId> + <version>1.0.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <version>5.0.0</version> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <version>5.0.0</version> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.settings</artifactId> + <version>1.0.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.commons.osgi</artifactId> + <version>2.2.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.api</artifactId> + <version>2.7.1-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <!-- Testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>junit-addons</groupId> + <artifactId>junit-addons</artifactId> + <version>1.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.testing.resourceresolver-mock</artifactId> + <version>0.2.0</version> + <scope>test</scope> + </dependency> + + </dependencies> +</project> diff --git a/src/main/java/org/apache/sling/event/dea/DEAConstants.java b/src/main/java/org/apache/sling/event/dea/DEAConstants.java new file mode 100644 index 0000000..6ea8718 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/DEAConstants.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea; + + +/** + * The <code>DEAConstants</code> provides some constants for + * handling distributed OSGi events. + * <p> + * If an event should be sent to other instances, the event + * property {@link #PROPERTY_DISTRIBUTE} should be set to + * an empty string. + * <p> + * An event, regardless if distributed or not, should never be + * created with the property {@link #PROPERTY_APPLICATION}. In + * addition properties starting with "event.dea." are reserved + * attributes of this implementation and must not be used + * by custom events. + * <p> + * If the event is a local event, the {@link #PROPERTY_APPLICATION} + * is not available. If it is available, it contains the application + * (Sling ID) of the instance where the event originated. + */ +public abstract class DEAConstants { + + /** This event property indicates, if the event should be distributed in the cluster. */ + public static final String PROPERTY_DISTRIBUTE = "event.distribute"; + + /** This event property specifies the application node. */ + public static final String PROPERTY_APPLICATION = "event.application"; +} \ No newline at end of file diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java new file mode 100644 index 0000000..df128b4 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea.impl; + +import java.util.Map; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.commons.osgi.PropertiesUtil; +import org.apache.sling.settings.SlingSettingsService; +import org.osgi.framework.BundleContext; +import org.osgi.service.event.EventAdmin; + +/** + * This service wraps the configuration of the distributed event admin + * and starts the different parts. + */ +@Component(name="org.apache.sling.event.impl.DistributingEventHandler") +public class DistributedEventAdminImpl { + + public static final String RESOURCE_TYPE_FOLDER = "sling:Folder"; + + public static final String RESOURCE_TYPE_EVENT = "sling/distributed/event"; + + @Reference + private SlingSettingsService settings; + + @Reference + private ResourceResolverFactory resourceResolverFactory; + + @Reference + private EventAdmin eventAdmin; + + /** Default repository path. */ + public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/distribution"; + + /** The path where all jobs are stored. */ + @Property(value=DEFAULT_REPOSITORY_PATH) + private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path"; + + /** Default clean up time is 15 minutes. */ + private static final int DEFAULT_CLEANUP_PERIOD = 15; + + @Property(intValue=DEFAULT_CLEANUP_PERIOD) + private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period"; + + /** The local receiver of distributed events .*/ + private DistributedEventReceiver receiver; + + /** The local sender for distributed events. */ + private DistributedEventSender sender; + + @Activate + protected void activate(final BundleContext bundleContext, final Map<String, Object> props) { + final int cleanupPeriod = PropertiesUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD); + final String rootPath = PropertiesUtil.toString(props.get( + CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH); + final String ownRootPath = rootPath.concat("/").concat(settings.getSlingId()); + + this.receiver = new DistributedEventReceiver(bundleContext, + rootPath, + ownRootPath, + cleanupPeriod, + this.resourceResolverFactory, this.settings); + this.sender = new DistributedEventSender(bundleContext, + rootPath, + ownRootPath, + this.resourceResolverFactory, this.eventAdmin); + } + + @Deactivate + protected void deactivate() { + if ( this.receiver != null ) { + this.receiver.stop(); + this.receiver = null; + } + if ( this.sender != null ) { + this.sender.stop(); + this.sender = null; + } + } +} diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java new file mode 100644 index 0000000..58ed940 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea.impl; + +import java.util.Calendar; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.sling.api.resource.LoginException; +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.api.resource.ResourceUtil; +import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.discovery.TopologyEvent; +import org.apache.sling.discovery.TopologyEvent.Type; +import org.apache.sling.discovery.TopologyEventListener; +import org.apache.sling.event.dea.DEAConstants; +import org.apache.sling.settings.SlingSettingsService; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventConstants; +import org.osgi.service.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the distributed event receiver. + * It listens for all distributable events and stores them in the + * repository for other cluster instance to pick them up. + * <p> + * This component is scheduled to run some clean up tasks in the + * background periodically. + * <p> + */ +public class DistributedEventReceiver + implements EventHandler, Runnable, TopologyEventListener { + + /** Special topic to stop the queue. */ + private static final String TOPIC_STOPPED = "org/apache/sling/event/dea/impl/STOPPED"; + + /** Logger */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** A local queue for writing received events into the repository. */ + private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>(); + + /** The resource resolver factory. */ + private final ResourceResolverFactory resourceResolverFactory; + + /** The current instance id. */ + private final String slingId; + + /** The root path for events . */ + private final String rootPath; + + /** The root path for events written by this instance. */ + private final String ownRootPath; + + /** The cleanup period. */ + private final int cleanupPeriod; + + /** Resolver used for writing. */ + private volatile ResourceResolver writerResolver; + + /** Is the background task still running? */ + private volatile boolean running; + + /** The current instances if this is the leader. */ + private volatile Set<String> instances; + + /** The service registration. */ + private volatile ServiceRegistration<?> serviceRegistration; + + public DistributedEventReceiver(final BundleContext bundleContext, + final String rootPath, + final String ownRootPath, + final int cleanupPeriod, + final ResourceResolverFactory rrFactory, + final SlingSettingsService settings) { + this.rootPath = rootPath; + this.ownRootPath = ownRootPath; + this.resourceResolverFactory = rrFactory; + this.slingId = settings.getSlingId(); + this.cleanupPeriod = cleanupPeriod; + + this.running = true; + // start writer thread + final Thread writerThread = new Thread(new Runnable() { + @Override + public void run() { + // create service registration properties + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation"); + + // listen for all OSGi events with the distributable flag + props.put(EventConstants.EVENT_TOPIC, "*"); + props.put(EventConstants.EVENT_FILTER, "(" + DEAConstants.PROPERTY_DISTRIBUTE + "=*)"); + // schedule this service every 30 minutes + props.put("scheduler.period", 1800L); + props.put("scheduler.concurrent", Boolean.FALSE); + + final ServiceRegistration<?> reg = + bundleContext.registerService(new String[] {EventHandler.class.getName(), + Runnable.class.getName(), + TopologyEventListener.class.getName()}, + DistributedEventReceiver.this, props); + + DistributedEventReceiver.this.serviceRegistration = reg; + + try { + writerResolver = resourceResolverFactory.getAdministrativeResourceResolver(null); + ResourceUtil.getOrCreateResource(writerResolver, + ownRootPath, + DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER, + DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER, + true); + } catch (final Exception e) { + // there is nothing we can do except log! + logger.error("Error during resource resolver creation.", e); + running = false; + } + try { + processWriteQueue(); + } catch (final Throwable t) { //NOSONAR + logger.error("Writer thread stopped with exception: " + t.getMessage(), t); + running = false; + } + if ( writerResolver != null ) { + writerResolver.close(); + writerResolver = null; + } + } + }); + writerThread.start(); + } + + /** + * Deactivate this component. + */ + public void stop() { + if ( this.serviceRegistration != null ) { + this.serviceRegistration.unregister(); + this.serviceRegistration = null; + } + // stop background threads by putting empty objects into the queue + this.running = false; + try { + this.writeQueue.put(new Event(TOPIC_STOPPED, (Dictionary<String, Object>)null)); + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + } + } + + /** + * Background thread writing events into the queue. + */ + private void processWriteQueue() { + while ( this.running ) { + // so let's wait/get the next event from the queue + Event event = null; + try { + event = this.writeQueue.take(); + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + this.running = false; + } + if ( event != null && this.running ) { + try { + this.writeEvent(event); + } catch (final Exception e) { + this.logger.error("Exception during writing the event to the resource tree.", e); + } + } + } + } + + /** Counter for events. */ + private final AtomicLong eventCounter = new AtomicLong(0); + + /** + * Write an event to the resource tree. + * @param event The event + * @throws PersistenceException + */ + private void writeEvent(final Event event) + throws PersistenceException { + final Calendar now = Calendar.getInstance(); + + final StringBuilder sb = new StringBuilder(this.ownRootPath); + sb.append('/'); + sb.append(now.get(Calendar.YEAR)); + sb.append('/'); + sb.append(now.get(Calendar.MONTH) + 1); + sb.append('/'); + sb.append(now.get(Calendar.DAY_OF_MONTH)); + sb.append('/'); + sb.append(now.get(Calendar.HOUR_OF_DAY)); + sb.append('/'); + sb.append(now.get(Calendar.MINUTE)); + sb.append('/'); + sb.append("event-"); + sb.append(String.valueOf(eventCounter.getAndIncrement())); + + // create properties + final Map<String, Object> properties = new HashMap<String, Object>(); + + final String[] propNames = event.getPropertyNames(); + if ( propNames != null && propNames.length > 0 ) { + for(final String propName : propNames) { + properties.put(propName, event.getProperty(propName)); + } + } + + properties.remove(DEAConstants.PROPERTY_DISTRIBUTE); + properties.put(EventConstants.EVENT_TOPIC, event.getTopic()); + properties.put(DEAConstants.PROPERTY_APPLICATION, this.slingId); + final Object oldRT = properties.get(ResourceResolver.PROPERTY_RESOURCE_TYPE); + if ( oldRT != null ) { + properties.put("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT); + } + properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, DistributedEventAdminImpl.RESOURCE_TYPE_EVENT); + ResourceUtil.getOrCreateResource(this.writerResolver, + sb.toString(), + properties, + DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER, + true); + } + + /** + * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event) + */ + @Override + public void handleEvent(final Event event) { + try { + this.writeQueue.put(event); + } catch (final InterruptedException ex) { + this.ignoreException(ex); + Thread.currentThread().interrupt(); + } + } + + /** + * Helper method which just logs the exception in debug mode. + * @param e + */ + private void ignoreException(final Exception e) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Ignored exception " + e.getMessage(), e); + } + } + + /** + * This method is invoked periodically. + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + this.cleanUpObsoleteInstances(); + this.cleanUpObsoleteEvents(); + } + + private void cleanUpObsoleteInstances() { + final Set<String> slingIds = this.instances; + if ( slingIds != null ) { + this.instances = null; + this.logger.debug("Checking for old instance trees for distributed events."); + ResourceResolver resolver = null; + try { + resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null); + + final Resource baseResource = resolver.getResource(this.rootPath); + // sanity check - should never be null + if ( baseResource != null ) { + final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50); + final Iterator<Resource> iter = baseResource.listChildren(); + while ( iter.hasNext() ) { + final Resource rootResource = iter.next(); + if ( !slingIds.contains(rootResource.getName()) ) { + brr.delete(rootResource); + } + } + // final commit for outstanding deletes + resolver.commit(); + } + + } catch (final PersistenceException pe) { + // in the case of an error, we just log this as a warning + this.logger.warn("Exception during job resource tree cleanup.", pe); + } catch (final LoginException ignore) { + this.ignoreException(ignore); + } finally { + if ( resolver != null ) { + resolver.close(); + } + } + } + } + + private void cleanUpObsoleteEvents() { + if ( this.cleanupPeriod > 0 ) { + this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", this.cleanupPeriod); + + ResourceResolver resolver = null; + try { + resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null); + final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50); + + final Resource baseResource = resolver.getResource(this.ownRootPath); + // sanity check - should never be null + if ( baseResource != null ) { + final Calendar oldDate = Calendar.getInstance(); + oldDate.add(Calendar.MINUTE, -1 * this.cleanupPeriod); + + // check years + final int oldYear = oldDate.get(Calendar.YEAR); + final Iterator<Resource> yearIter = baseResource.listChildren(); + while ( yearIter.hasNext() ) { + final Resource yearResource = yearIter.next(); + final int year = Integer.valueOf(yearResource.getName()); + if ( year < oldYear ) { + brr.delete(yearResource); + } else if ( year == oldYear ) { + + // same year - check months + final int oldMonth = oldDate.get(Calendar.MONTH) + 1; + final Iterator<Resource> monthIter = yearResource.listChildren(); + while ( monthIter.hasNext() ) { + final Resource monthResource = monthIter.next(); + final int month = Integer.valueOf(monthResource.getName()); + if ( month < oldMonth ) { + brr.delete(monthResource); + } else if ( month == oldMonth ) { + + // same month - check days + final int oldDay = oldDate.get(Calendar.DAY_OF_MONTH); + final Iterator<Resource> dayIter = monthResource.listChildren(); + while ( dayIter.hasNext() ) { + final Resource dayResource = dayIter.next(); + final int day = Integer.valueOf(dayResource.getName()); + if ( day < oldDay ) { + brr.delete(dayResource); + } else if ( day == oldDay ) { + + // same day - check hours + final int oldHour = oldDate.get(Calendar.HOUR_OF_DAY); + final Iterator<Resource> hourIter = dayResource.listChildren(); + while ( hourIter.hasNext() ) { + final Resource hourResource = hourIter.next(); + final int hour = Integer.valueOf(hourResource.getName()); + if ( hour < oldHour ) { + brr.delete(hourResource); + } else if ( hour == oldHour ) { + + // same hour - check minutes + final int oldMinute = oldDate.get(Calendar.MINUTE); + final Iterator<Resource> minuteIter = hourResource.listChildren(); + while ( minuteIter.hasNext() ) { + final Resource minuteResource = minuteIter.next(); + + final int minute = Integer.valueOf(minuteResource.getName()); + if ( minute < oldMinute ) { + brr.delete(minuteResource); + } + } + } + } + } + } + } + } + } + } + } + // final commit for outstanding resources + resolver.commit(); + + } catch (final PersistenceException pe) { + // in the case of an error, we just log this as a warning + this.logger.warn("Exception during job resource tree cleanup.", pe); + } catch (final LoginException ignore) { + this.ignoreException(ignore); + } finally { + if ( resolver != null ) { + resolver.close(); + } + } + } + } + + /** + * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent) + */ + @Override + public void handleTopologyEvent(final TopologyEvent event) { + if ( event.getType() == Type.TOPOLOGY_CHANGING ) { + this.instances = null; + } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) { + if ( event.getNewView().getLocalInstance().isLeader() ) { + final Set<String> set = new HashSet<String>(); + for(final InstanceDescription desc : event.getNewView().getInstances() ) { + set.add(desc.getSlingId()); + } + this.instances = set; + } + } + } +} + diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java new file mode 100644 index 0000000..95aa021 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea.impl; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.sling.api.SlingConstants; +import org.apache.sling.api.resource.LoginException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.api.resource.ValueMap; +import org.apache.sling.event.dea.DEAConstants; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; +import org.osgi.service.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This event handler distributes events from other application nodes in + * the cluster on the current instance. + * <p> + * It's listening for resource added events in the resource tree storing the + * distributed events. If a new resource is added, the resource is read, + * converted to an event and then send using the local event admin. + * <p> + */ +public class DistributedEventSender + implements EventHandler { + + /** Default logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** Is the background task still running? */ + private volatile boolean running; + + /** A local queue for serializing the event processing. */ + private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + + private final ResourceResolverFactory resourceResolverFactory; + + private final EventAdmin eventAdmin; + + private final String ownRootPathWithSlash; + + private volatile ServiceRegistration<?> serviceRegistration; + + public DistributedEventSender(final BundleContext bundleContext, + final String rootPath, + final String ownRootPath, + final ResourceResolverFactory rrFactory, + final EventAdmin eventAdmin) { + this.eventAdmin = eventAdmin; + this.resourceResolverFactory = rrFactory; + this.ownRootPathWithSlash = ownRootPath + "/"; + + this.running = true; + final Thread backgroundThread = new Thread(new Runnable() { + @Override + public void run() { + // create service registration properties + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation"); + + // listen for all resource added OSGi events in the DEA area + props.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED); + props.put(EventConstants.EVENT_FILTER, "(path=" + rootPath + "/*)"); + + final ServiceRegistration<?> reg = + bundleContext.registerService(new String[] {EventHandler.class.getName()}, + DistributedEventSender.this, props); + + DistributedEventSender.this.serviceRegistration = reg; + + try { + runInBackground(); + } catch (Throwable t) { //NOSONAR + logger.error("Background thread stopped with exception: " + t.getMessage(), t); + running = false; + } + } + }); + backgroundThread.start(); + } + + /** + * Deactivate this component. + */ + public void stop() { + if ( this.serviceRegistration != null ) { + this.serviceRegistration.unregister(); + this.serviceRegistration = null; + } + // stop background threads by putting empty objects into the queue + this.running = false; + try { + this.queue.put(""); + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + } + } + + /** + * Read an event from the resource + * @return The event object or <code>null</code> + */ + private Event readEvent(final Resource eventResource) { + try { + final ValueMap vm = ResourceHelper.getValueMap(eventResource); + final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class); + if ( topic == null ) { + // no topic should never happen as we check the resource type before + logger.error("Unable to read distributed event from " + eventResource.getPath() + " : no topic property available."); + } else { + final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm); + // only send event if there are no read errors, otherwise discard it + @SuppressWarnings("unchecked") + final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST); + if ( readErrorList == null ) { + properties.remove(EventConstants.EVENT_TOPIC); + properties.remove(DEAConstants.PROPERTY_DISTRIBUTE); + final Object oldRT = properties.remove("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE); + if ( oldRT != null ) { + properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT); + } else { + properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE); + } + try { + final Event event = new Event(topic, properties); + return event; + } catch (final IllegalArgumentException iae) { + // this exception occurs if the topic is not correct (it should never happen, + // but you never know) + logger.error("Unable to read event: " + iae.getMessage(), iae); + } + } else { + for(final Exception e : readErrorList) { + logger.warn("Unable to read distributed event from " + eventResource.getPath(), e); + } + } + } + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + this.ignoreException(ie); + } + return null; + } + + /** + * Background thread + */ + private void runInBackground() { + while ( this.running ) { + // so let's wait/get the next event from the queue + String path = null; + try { + path = this.queue.take(); + } catch (final InterruptedException e) { + this.ignoreException(e); + Thread.currentThread().interrupt(); + this.running = false; + } + if ( path != null && path.length() > 0 && this.running ) { + ResourceResolver resolver = null; + try { + resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null); + final Resource eventResource = resolver.getResource(path); + if ( DistributedEventAdminImpl.RESOURCE_TYPE_EVENT.equals(eventResource.getResourceType())) { + final Event e = this.readEvent(eventResource); + if ( e != null ) { + // we check event admin as processing is async + final EventAdmin localEA = this.eventAdmin; + if ( localEA != null ) { + localEA.postEvent(e); + } else { + this.logger.error("Unable to post event as no event admin is available."); + } + } + } + } catch (final LoginException ex) { + this.logger.error("Exception during creation of resource resolver.", ex); + } finally { + if ( resolver != null ) { + resolver.close(); + } + } + } + } + } + + /** + * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event) + */ + @Override + public void handleEvent(final Event event) { + final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH); + if ( !path.startsWith(this.ownRootPathWithSlash) ) { + try { + this.queue.put(path); + } catch (final InterruptedException ex) { + this.ignoreException(ex); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Helper method which just logs the exception in debug mode. + * @param e + */ + private void ignoreException(final Exception e) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Ignored exception " + e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java new file mode 100644 index 0000000..0694b74 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ValueMap; + +public abstract class ResourceHelper { + + public static final String PROPERTY_MARKER_READ_ERROR_LIST = ResourceHelper.class.getName() + "/ReadErrorList"; + + public static Map<String, Object> cloneValueMap(final ValueMap vm) throws InstantiationException { + List<Exception> hasReadError = null; + try { + final Map<String, Object> result = new HashMap<String, Object>(vm); + for(final Map.Entry<String, Object> entry : result.entrySet()) { + if ( entry.getValue() instanceof InputStream ) { + final Object value = vm.get(entry.getKey(), Serializable.class); + if ( value != null ) { + entry.setValue(value); + } else { + if ( hasReadError == null ) { + hasReadError = new ArrayList<Exception>(); + } + final int count = hasReadError.size(); + // let's find out which class might be missing + ObjectInputStream ois = null; + try { + ois = new ObjectInputStream((InputStream)entry.getValue()); + ois.readObject(); + } catch (final ClassNotFoundException cnfe) { + hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", cnfe)); + } catch (final IOException ioe) { + hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", ioe)); + } finally { + if ( ois != null ) { + try { + ois.close(); + } catch (IOException ignore) { + // ignore + } + } + } + if ( hasReadError.size() == count ) { + hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'")); + } + } + } + } + if ( hasReadError != null ) { + result.put(PROPERTY_MARKER_READ_ERROR_LIST, hasReadError); + } + return result; + } catch ( final IllegalArgumentException iae) { + // the JCR implementation might throw an IAE if something goes wrong + throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae); + } + } + + public static ValueMap getValueMap(final Resource resource) throws InstantiationException { + final ValueMap vm = resource.getValueMap(); + // trigger full loading + try { + vm.size(); + } catch ( final IllegalArgumentException iae) { + // the JCR implementation might throw an IAE if something goes wrong + throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae); + } + return vm; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sling/event/dea/package-info.java b/src/main/java/org/apache/sling/event/dea/package-info.java new file mode 100644 index 0000000..9495421 --- /dev/null +++ b/src/main/java/org/apache/sling/event/dea/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@Version("1.0.0") +package org.apache.sling.event.dea; + +import aQute.bnd.annotation.Version; + diff --git a/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java new file mode 100644 index 0000000..3547300 --- /dev/null +++ b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.dea.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.List; + +import org.apache.sling.api.SlingConstants; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.event.dea.DEAConstants; +import org.apache.sling.settings.SlingSettingsService; +import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; +import org.apache.sling.testing.resourceresolver.MockResourceResolverFactoryOptions; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mockito; +import org.osgi.framework.BundleContext; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +public class DistributingEventHandlerTest { + + private static final String TOPIC_PREFIX = "write/"; + + private DistributedEventReceiver receiver; + + private DistributedEventSender sender; + + private static final String MY_APP_ID = "1234"; + + private static final String OTHER_APP_ID = "5678"; + + private final List<Event> events = Collections.synchronizedList(new ArrayList<Event>()); + + @SuppressWarnings("unchecked") + @Before + public void setup() throws Exception { + final BundleContext bc = Mockito.mock(BundleContext.class); + Mockito.when(bc.registerService(Mockito.any(String[].class), + Mockito.any(), + Mockito.any(Dictionary.class))).thenReturn(null); + + final SlingSettingsService otherSettings = Mockito.mock(SlingSettingsService.class); + Mockito.when(otherSettings.getSlingId()).thenReturn(OTHER_APP_ID); + + final EventAdmin ea = new EventAdmin() { + + @Override + public void sendEvent(final Event event) { + this.postEvent(event); + } + + @Override + public void postEvent(final Event event) { + final String topic = event.getTopic(); + if ( topic.equals(SlingConstants.TOPIC_RESOURCE_ADDED) ) { + sender.handleEvent(event); + } else if ( topic.startsWith(TOPIC_PREFIX) ) { + events.add(event); + } + } + }; + final MockResourceResolverFactoryOptions opts = new MockResourceResolverFactoryOptions(); + opts.setEventAdmin(ea); + final ResourceResolverFactory factory = new MockResourceResolverFactory(opts); + + this.sender = new DistributedEventSender(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH, + DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + MY_APP_ID, factory, ea); + + this.receiver = new DistributedEventReceiver(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH, + DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + OTHER_APP_ID, 15, factory, otherSettings); + } + + @After + public void cleanup() { + if ( this.sender != null ) { + this.sender.stop(); + this.sender = null; + } + if ( this.receiver != null ) { + this.receiver.stop(); + this.receiver = null; + } + } + + @org.junit.Test(timeout=5000) public void testSendEvent() throws Exception { + this.events.clear(); + + final String VALUE = "some value"; + final String topic = TOPIC_PREFIX + "event/test"; + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put("a property", VALUE); + final Event e = new Event(topic, props); + this.receiver.handleEvent(e); + + while ( this.events.size() == 0 ) { + Thread.sleep(5); + } + final Event receivedEvent = this.events.get(0); + + assertEquals(topic, receivedEvent.getTopic()); + assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION)); + assertEquals(VALUE, receivedEvent.getProperty("a property")); + assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE)); + + this.events.clear(); + } + + @org.junit.Test(timeout=5000) public void testSendEventPlusAppId() throws Exception { + this.events.clear(); + + final String VALUE = "some value"; + final String topic = TOPIC_PREFIX + "event/test"; + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put("a property", "some value"); + // now we check if the application id is handled correctly + props.put(DEAConstants.PROPERTY_APPLICATION, "foo"); + + final Event e = new Event(topic, props); + this.receiver.handleEvent(e); + + while ( this.events.size() == 0 ) { + Thread.sleep(5); + } + final Event receivedEvent = this.events.get(0); + + assertEquals(topic, receivedEvent.getTopic()); + assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION)); + assertEquals(VALUE, receivedEvent.getProperty("a property")); + assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE)); + + this.events.clear(); + } + + @org.junit.Test(timeout=5000) public void testSendEventWithResourceType() throws Exception { + this.events.clear(); + + final String topic = TOPIC_PREFIX + "event/test"; + final String RT = "my:resourceType"; + + final Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, RT); + + final Event e = new Event(topic, props); + this.receiver.handleEvent(e); + + while ( this.events.size() == 0 ) { + Thread.sleep(5); + } + final Event receivedEvent = this.events.get(0); + + assertEquals(topic, receivedEvent.getTopic()); + assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION)); + assertEquals(RT, receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE)); + assertNull(receivedEvent.getProperty("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE)); + + this.events.clear(); + } +} -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
