Author: tommaso
Date: Fri Feb 14 16:34:56 2014
New Revision: 1568345
URL: http://svn.apache.org/r1568345
Log:
SLING-3395 - event based reverse replication via Server Sent Events
Added:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
(with props)
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
(with props)
Modified:
sling/trunk/contrib/extensions/replication/pom.xml
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
Modified: sling/trunk/contrib/extensions/replication/pom.xml
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/pom.xml?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/pom.xml (original)
+++ sling/trunk/contrib/extensions/replication/pom.xml Fri Feb 14 16:34:56 2014
@@ -79,6 +79,7 @@
org.apache.sling.replication.rule,
org.apache.sling.replication.serialization
</Export-Package>
+ <Embed-Dependency>httpasyncclient</Embed-Dependency>
</instructions>
</configuration>
</plugin>
@@ -178,12 +179,22 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
- <version>4.2</version>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-osgi</artifactId>
- <version>4.2</version>
+ <version>4.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient-osgi</artifactId>
+ <version>4.3.2</version>
</dependency>
<!-- JACKRABBIT -->
<dependency>
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
Fri Feb 14 16:34:56 2014
@@ -32,6 +32,10 @@ public class ReplicationAgentQueueResour
public static final String SUFFIX_PATH = "/queue";
+ public static final String EVENT_RESOURCE_TYPE =
"sling/replication/agent/queue/event";
+
+ public static final String EVENT_SUFFIX_PATH = "/queue/event";
+
private final ReplicationQueue queue;
private final ResourceResolver resourceResolver;
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
Fri Feb 14 16:34:56 2014
@@ -49,11 +49,11 @@ import org.slf4j.LoggerFactory;
@Service(value = ResourceProvider.class)
@Properties({
@Property(name = ResourceProvider.ROOTS,
- value = {
- ReplicationAgentResource.BASE_PATH,
- ReplicationAgentConfigurationResource.BASE_PATH,
- ReplicationAgentResource.IMPORTER_BASE_PATH
- })
+ value = {
+ ReplicationAgentResource.BASE_PATH,
+ ReplicationAgentConfigurationResource.BASE_PATH,
+ ReplicationAgentResource.IMPORTER_BASE_PATH
+ })
})
public class ReplicationAgentResourceProvider implements ResourceProvider {
@@ -82,7 +82,7 @@ public class ReplicationAgentResourcePro
public Resource getResource(ResourceResolver resourceResolver, String
path) {
- if(!isAuthorized(resourceResolver)) return null;
+ if (!isAuthorized(resourceResolver)) return null;
Resource resource = null;
@@ -118,7 +118,7 @@ public class ReplicationAgentResourcePro
} else {
log.warn("could not find a configuration manager service");
}
- } else if(path.startsWith(ReplicationAgentResource.BASE_PATH+"/")) {
+ } else if (path.startsWith(ReplicationAgentResource.BASE_PATH + "/")) {
if (path.endsWith(ReplicationAgentQueueResource.SUFFIX_PATH)) {
String agentPath = path.substring(0, path.lastIndexOf('/'));
@@ -131,10 +131,11 @@ public class ReplicationAgentResourcePro
} catch (ReplicationQueueException e) {
log.warn("could not find a queue for agent {}", agentPath);
}
+ } else if
(path.endsWith(ReplicationAgentQueueResource.EVENT_SUFFIX_PATH)) {
+ resource = new SyntheticResource(resourceResolver, path,
ReplicationAgentQueueResource.EVENT_RESOURCE_TYPE);
} else {
- String agentPath = path;
ReplicationAgent replicationAgent = getAgentAtPath(path);
- log.info("resolving agent with path {}", agentPath);
+ log.info("resolving agent with path {}", path);
resource = replicationAgent != null ? new
ReplicationAgentResource(replicationAgent,
resourceResolver) : null;
@@ -146,22 +147,20 @@ public class ReplicationAgentResourcePro
}
- private boolean isAuthorized(ResourceResolver resourceResolver){
+ private boolean isAuthorized(ResourceResolver resourceResolver) {
boolean isAuthorized = false;
Session session = resourceResolver.adaptTo(Session.class);
- if(session != null) {
- try{
+ if (session != null) {
+ try {
isAuthorized = session.nodeExists(SECURITY_OBJECT);
- }
- catch (Exception ex){
+ } catch (Exception ex) {
}
}
- if(isAuthorized){
+ if (isAuthorized) {
log.debug("granting access to agent resources as user can read
/system/replication/security");
- }
- else {
+ } else {
log.debug("denying access to agent resources as user can't read
/system/replication/security");
}
@@ -169,9 +168,7 @@ public class ReplicationAgentResourcePro
}
private String getAgentNameAtPath(String path) {
- String agentName = path.substring(path.lastIndexOf('/') + 1);
-
- return agentName;
+ return path.substring(path.lastIndexOf('/') + 1);
}
private ReplicationAgent getAgentAtPath(String path) {
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
Fri Feb 14 16:34:56 2014
@@ -113,8 +113,6 @@ public class ReplicationAgentServiceFact
private ServiceRegistration agentReg;
- private ServiceRegistration jobReg;
-
@Reference
private ReplicationRuleEngine replicationRuleEngine;
@@ -183,9 +181,7 @@ public class ReplicationAgentServiceFact
replicationAgent.disable();
- if (agentReg != null) {
- agentReg.unregister();
- }
+ agentReg.unregister();
}
}
}
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
Fri Feb 14 16:34:56 2014
@@ -208,7 +208,7 @@ public class SimpleReplicationAgent impl
try {
ReplicationPackage replicationPackage =
packageBuilder.getPackage(itemInfo.getId());
- if (replicationPackage == null || transportHandler == null) {
+ if (replicationPackage == null || isPassive()) {
log.info("agent {} processing skipped", name);
return false;
} else {
@@ -254,22 +254,26 @@ public class SimpleReplicationAgent impl
public void enable() {
+ log.info("enabling agent");
// apply rules if any
if (rules.length > 0) {
ruleEngine.applyRules(this, rules);
}
- if (!isPassive())
+ if (!isPassive()) {
queueProvider.enableQueueProcessing(this, this);
+ }
}
public void disable() {
+ log.info("disabling agent");
if (rules != null) {
ruleEngine.unapplyRules(this, rules);
}
- if (!isPassive())
+ if (!isPassive()) {
queueProvider.disableQueueProcessing(this);
+ }
}
}
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
Fri Feb 14 16:34:56 2014
@@ -23,55 +23,62 @@ import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.replication.queue.ReplicationQueueProcessor;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
-
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
import
org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Component(metatype = false)
@Service(value = ReplicationQueueProvider.class)
@Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
public class JobHandlingReplicationQueueProvider extends
AbstractReplicationQueueProvider
- implements ReplicationQueueProvider {
+ implements ReplicationQueueProvider {
public static final String NAME = "sjh";
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
@Reference
private JobManager jobManager;
@Reference
private ConfigurationAdmin configAdmin;
- private Map<String, ServiceRegistration> jobs = new
ConcurrentHashMap<String, ServiceRegistration>();
+ private final Map<String, ServiceRegistration> jobs = new
ConcurrentHashMap<String, ServiceRegistration>();
private BundleContext context;
@Override
protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String
queueName)
- throws ReplicationQueueException {
+ throws ReplicationQueueException {
try {
String name = agent.getName() + queueName;
String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC
+ '/' + name;
if (jobManager.getQueue(name) == null) {
Configuration config = configAdmin.createFactoryConfiguration(
- QueueConfiguration.class.getName(), null);
+ QueueConfiguration.class.getName(), null);
Dictionary<String, Object> props = new Hashtable<String,
Object>();
props.put(ConfigurationConstants.PROP_NAME, name);
props.put(ConfigurationConstants.PROP_TYPE,
QueueConfiguration.Type.ORDERED.name());
- props.put(ConfigurationConstants.PROP_TOPICS, new String[] {
topic });
+ props.put(ConfigurationConstants.PROP_TOPICS, new
String[]{topic});
props.put(ConfigurationConstants.PROP_RETRIES, -1);
props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
@@ -96,26 +103,34 @@ public class JobHandlingReplicationQueue
String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC +
'/' + agent.getName();
String childTopic = topic + "/*";
jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic,
childTopic});
- ServiceRegistration jobReg =
context.registerService(JobConsumer.class.getName(),
- new ReplicationAgentJobConsumer(agent, queueProcessor),
jobProps);
- jobs.put(agent.getName(), jobReg);
+ synchronized (jobs) {
+ log.info("registering job consumer for agent {}", agent.getName());
+ ServiceRegistration jobReg =
context.registerService(JobConsumer.class.getName(),
+ new ReplicationAgentJobConsumer(agent, queueProcessor),
jobProps);
+ jobs.put(agent.getName(), jobReg);
+ log.info("job consumer for agent {} registered", agent.getName());
+ }
}
public void disableQueueProcessing(ReplicationAgent agent) {
- ServiceRegistration jobReg = jobs.remove(agent.getName());
- if (jobReg != null) {
- jobReg.unregister();
+ synchronized (jobs) {
+ log.info("unregistering job consumer for agent {}",
agent.getName());
+ ServiceRegistration jobReg = jobs.remove(agent.getName());
+ if (jobReg != null) {
+ jobReg.unregister();
+ log.info("job consumer for agent {} unregistered",
agent.getName());
+ }
}
}
@Activate
- private void activate(BundleContext context){
+ private void activate(BundleContext context) {
this.context = context;
}
@Deactivate
- private void deactivate(BundleContext context){
- for (ServiceRegistration jobReg : jobs.values()){
+ private void deactivate(BundleContext context) {
+ for (ServiceRegistration jobReg : jobs.values()) {
jobReg.unregister();
}
}
Added:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java?rev=1568345&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
(added)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
Fri Feb 14 16:34:56 2014
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.rule.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIUtils;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
+import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
+import org.apache.sling.replication.agent.ReplicationAgentConfigurationManager;
+import org.apache.sling.replication.agent.impl.ReplicationAgentQueueResource;
+import org.apache.sling.replication.agent.impl.ReplicationAgentResource;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.rule.ReplicationRule;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.sling.replication.rule.ReplicationRule} to trigger
+ */
+@Component(immediate = true)
+@Service(value = ReplicationRule.class)
+public class ReplicateOnQueueEventRule implements ReplicationRule {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String SIGNATURE = "queue event based {action} [on
${path}]";
+
+ private static final String SIGNATURE_REGEX =
"(queue\\sevent\\sbased)\\s(add|delete|poll)(\\s(on)\\s(\\/\\w+)+)?";
+
+ private static final Pattern signaturePattern =
Pattern.compile(SIGNATURE_REGEX);
+
+ @Reference
+ private ReplicationAgentConfigurationManager
replicationAgentConfigurationManager;
+
+ @Reference
+ private Scheduler scheduler;
+
+ private BundleContext context;
+
+ private Map<String, Future<HttpResponse>> requests;
+
+ @Activate
+ protected void activate(BundleContext context) {
+ this.context = context;
+ this.requests = new ConcurrentHashMap<String, Future<HttpResponse>>();
+ }
+
+ public String getSignature() {
+ return SIGNATURE;
+ }
+
+ public boolean signatureMatches(String ruleString) {
+ return ruleString.matches(SIGNATURE_REGEX);
+ }
+
+ public void apply(String ruleString, ReplicationAgent agent) {
+ Matcher matcher = signaturePattern.matcher(ruleString);
+ if (matcher.find()) {
+ String action = matcher.group(2);
+ ReplicationActionType actionType =
ReplicationActionType.fromName(action.toUpperCase());
+ String path = matcher.group(5); // can be null
+ try {
+ log.info("applying queue event replication rule");
+ // get configuration
+ ReplicationAgentConfiguration configuration =
replicationAgentConfigurationManager.getConfiguration(agent.getName());
+
+ // get URI of the event queue
+ String targetTransport =
configuration.getTargetTransportHandler();
+
+ log.info("found target transport {}", targetTransport);
+
+ ScheduleOptions options = scheduler.NOW();
+ options.name(agent.getName() + " " + ruleString);
+ scheduler.schedule(new EventBasedReplication(agent,
actionType, path, targetTransport), options);
+
+ } catch (Exception e) {
+ log.error("{}", e);
+ log.error("cannot apply rule {} to agent {}", ruleString,
agent);
+ }
+
+ }
+ }
+
+ public void undo(String ruleString, ReplicationAgent agent) {
+ Future<HttpResponse> httpResponseFuture =
requests.remove(agent.getName());
+ if (httpResponseFuture != null) {
+ httpResponseFuture.cancel(true);
+ }
+ }
+
+ private class SSEResponseConsumer extends BasicAsyncResponseConsumer {
+
+ private final ReplicationAgent agent;
+ private final ReplicationActionType action;
+ private final String path;
+
+ private SSEResponseConsumer(ReplicationAgent agent,
ReplicationActionType action, String path) {
+ this.agent = agent;
+ this.action = action;
+ this.path = path == null ? "/" : path;
+ }
+
+ @Override
+ protected void onContentReceived(ContentDecoder decoder, IOControl
ioctrl) throws IOException {
+// log.info("complete ? ", decoder.isCompleted());
+// ByteBuffer buffer = ByteBuffer.allocate(1024);
+// decoder.read(buffer);
+// log.info("content {} received {},{}", new Object[]{buffer,
decoder, ioctrl});
+ log.info("event received");
+
+ try {
+ asyncReplicate(agent, action, path);
+ log.info("replication request to agent {} sent ({} on {})",
new Object[]{agent.getName(), action, path});
+ } catch (AgentReplicationException e) {
+ log.error("cannot replicate to agent {}, {}", agent.getName(),
e);
+ }
+
+ super.onContentReceived(decoder, ioctrl);
+ }
+
+ @Override
+ protected void onResponseReceived(HttpResponse response) throws
IOException {
+ log.info("response received {}", response);
+ super.onResponseReceived(response);
+ }
+ }
+
+ private void asyncReplicate(ReplicationAgent agent, ReplicationActionType
action, String path) throws AgentReplicationException {
+ agent.send(new ReplicationRequest(System.currentTimeMillis(), action,
path));
+ }
+
+ private class EventBasedReplication implements Runnable {
+ private final ReplicationAgent agent;
+ private final ReplicationActionType actionType;
+ private final String targetTransport;
+ private final String path;
+
+ public EventBasedReplication(ReplicationAgent agent,
ReplicationActionType actionType, String path, String targetTransport) {
+ this.agent = agent;
+ this.actionType = actionType;
+ this.targetTransport = targetTransport;
+ this.path = path;
+ }
+
+ public void run() {
+ try {
+ ServiceReference[] serviceReferences =
context.getServiceReferences(TransportHandler.class.getName(), targetTransport);
+
+ log.info("reference transport for {} found {}",
targetTransport, serviceReferences != null);
+
+ if (serviceReferences != null && serviceReferences.length ==
1) {
+
+ Object endpointsProperty =
serviceReferences[0].getProperty("endpoints");
+ Object authenticationPropertiesProperty =
serviceReferences[0].getProperty("authentication.properties");
+
+ log.info("endpoint prop: {} authentication properties
prop: {}", endpointsProperty, authenticationPropertiesProperty);
+
+ String[] endpoints = (String[]) endpointsProperty;
+ Map<String, String> authenticationProperties =
(Map<String, String>) authenticationPropertiesProperty;
+
+ log.info("endpoint {} props {}", endpoints,
authenticationProperties);
+ // only works with HTTP
+ if (endpoints.length == 1 &&
endpoints[0].startsWith("http") &&
endpoints[0].contains(ReplicationAgentResource.BASE_PATH) &&
authenticationProperties != null) {
+ log.info("getting event queue URI");
+ URI eventEndpoint = URI.create(endpoints[0] +
ReplicationAgentQueueResource.EVENT_SUFFIX_PATH);
+ String userName = authenticationProperties.get("user");
+ String password =
authenticationProperties.get("password");
+
+ log.info("preparing request");
+ CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ new AuthScope(eventEndpoint.getHost(),
eventEndpoint.getPort()),
+ new UsernamePasswordCredentials(userName,
password));
+ CloseableHttpAsyncClient httpClient =
HttpAsyncClients.custom()
+
.setDefaultCredentialsProvider(credentialsProvider)
+ .build();
+ try {
+ HttpGet get = new HttpGet(eventEndpoint);
+ HttpHost target =
URIUtils.extractHost(get.getURI());
+ BasicAsyncRequestProducer
basicAsyncRequestProducer = new BasicAsyncRequestProducer(target, get);
+ httpClient.start();
+ log.info("sending request");
+ Future<HttpResponse> futureResponse =
httpClient.execute(
+ basicAsyncRequestProducer,
+ new SSEResponseConsumer(agent, actionType,
path), null);
+ requests.put(agent.getName(), futureResponse);
+ futureResponse.get();
+
+ } finally {
+ httpClient.close();
+ }
+ log.info("request finished");
+ }
+ }
+ } catch (Exception e) {
+ log.error("cannot execute event based replication");
+ }
+ }
+ }
+}
Propchange:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
Fri Feb 14 16:34:56 2014
@@ -61,14 +61,15 @@ public class ScheduleReplicateReplicatio
public void apply(String ruleString, ReplicationAgent agent) {
if (signatureMatches(ruleString)) {
Matcher matcher = signaturePattern.matcher(ruleString);
- matcher.find();
- String action = matcher.group(2);
- ReplicationActionType actionType =
ReplicationActionType.fromName(action.toUpperCase());
- String path = matcher.group(5); // can be null
- int seconds = Integer.parseInt(matcher.group(7));
- ScheduleOptions options = scheduler.NOW(-1, seconds);
- options.name(agent.getName() + " " + ruleString);
- scheduler.schedule(new ScheduledReplication(agent, actionType,
path), options);
+ if (matcher.find()) {
+ String action = matcher.group(2);
+ ReplicationActionType actionType =
ReplicationActionType.fromName(action.toUpperCase());
+ String path = matcher.group(5); // can be null
+ int seconds = Integer.parseInt(matcher.group(7));
+ ScheduleOptions options = scheduler.NOW(-1, seconds);
+ options.name(agent.getName() + " " + ruleString);
+ scheduler.schedule(new ScheduledReplication(agent, actionType,
path), options);
+ }
}
}
Added:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java?rev=1568345&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
(added)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
Fri Feb 14 16:34:56 2014
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.SlingHttpServletRequest;
+import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.api.servlets.SlingAllMethodsServlet;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationAgentsManager;
+import org.apache.sling.replication.agent.impl.ReplicationAgentQueueResource;
+import org.apache.sling.replication.event.ReplicationEvent;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue Server Sent Events servlet
+ */
+@SuppressWarnings("serial")
+@Component(metatype = false)
+@Service(value = Servlet.class)
+@Properties({
+ @Property(name = "sling.servlet.resourceTypes", value =
ReplicationAgentQueueResource.EVENT_RESOURCE_TYPE),
+ @Property(name = "sling.servlet.methods", value = "GET")
+})
+public class ReplicationAgentQueueEventServlet extends SlingAllMethodsServlet {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Map<String, Collection<String>> cachedEvents = new
ConcurrentHashMap<String, Collection<String>>();
+
+ @Reference
+ private ReplicationAgentsManager replicationAgentsManager;
+
+ private ServiceRegistration registration;
+
+ @Activate
+ protected void activate(BundleContext context) {
+ log.info("activating SSE");
+ Dictionary<String, Object> properties = new Hashtable<String,
Object>();
+ properties.put(EventConstants.EVENT_TOPIC,
ReplicationEvent.getTopic(ReplicationEventType.PACKAGE_QUEUED));
+ registration = context.registerService(EventHandler.class.getName(),
new SSEListener(), properties);
+ if (log.isInfoEnabled()) {
+ log.info("SSE activated : {}", registration != null);
+ }
+ }
+
+ @Deactivate
+ protected void deactivate() throws Exception {
+ log.info("deactivating SSE");
+ if (registration != null) {
+ registration.unregister();
+ }
+ synchronized (cachedEvents) {
+ cachedEvents.clear();
+ cachedEvents.notifyAll();
+ }
+ }
+
+
+ @Override
+ protected void doGet(SlingHttpServletRequest request,
SlingHttpServletResponse response)
+ throws ServletException, IOException {
+
+ // setup SSE headers
+ response.setContentType("text/event-stream");
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Cache-Control", "no-cache");
+ response.setHeader("Connection", "keep-alive");
+ // needed to allow the JavaScript EventSource API to make a call from
author to this server and listen for the events
+ response.setHeader("Access-Control-Allow-Origin",
request.getHeader("Origin"));
+ response.setHeader("Access-Control-Allow-Credentials", "true");
+
+ String agentName =
request.getResource().getParent().getParent().adaptTo(ReplicationAgent.class).getName();
+ PrintWriter writer = response.getWriter();
+ while (true) {
+ try {
+ synchronized (cachedEvents) {
+ cachedEvents.wait();
+ Collection<String> eventsForAgent =
cachedEvents.get(agentName);
+ if (eventsForAgent != null) {
+ for (String event : eventsForAgent) {
+ writeEvent(writer, agentName + "-queue-event",
event);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ if (log.isErrorEnabled()) {
+ log.error("error during SSE", e);
+ }
+ throw new ServletException(e);
+ }
+ }
+
+ }
+
+ /* Write a single server-sent event to the response stream for the given
event and message */
+ private void writeEvent(PrintWriter writer, String event, String message)
+ throws IOException {
+
+ // write the event type (make sure to include the double newline)
+ writer.write("id: " + System.nanoTime() + "\n");
+
+ // write the actual data
+ // this could be simple text or could be JSON-encoded text that the
+ // client then decodes
+ writer.write("data: " + message + "\n\n");
+
+ // flush the buffers to make sure the container sends the bytes
+ writer.flush();
+ if (log.isInfoEnabled()) {
+ log.info("SSE event {}: {}", event, message);
+ }
+ }
+
+ private class SSEListener implements EventHandler {
+ public void handleEvent(Event event) {
+ if (log.isInfoEnabled()) {
+ log.info("SSE listener running on event {}", event);
+ }
+ Object pathProperty =
event.getProperty("replication.package.paths");
+ Object agentNameProperty =
event.getProperty("replication.agent.name");
+ if (log.isInfoEnabled()) {
+ log.info("cached events {}", cachedEvents.size());
+ }
+ if (pathProperty != null && agentNameProperty != null) {
+ String agentName = String.valueOf(agentNameProperty);
+ String[] paths = (String[]) pathProperty;
+ synchronized (cachedEvents) {
+ if (log.isInfoEnabled()) {
+ log.info("queue event for agent {} on paths {}",
agentName, Arrays.toString(paths));
+ }
+ Collection<String> eventsForAgent =
cachedEvents.get(agentName);
+ if (eventsForAgent == null) {
+ eventsForAgent = new LinkedList<String>();
+ }
+ eventsForAgent.add(Arrays.toString(paths));
+ cachedEvents.put(agentName, eventsForAgent);
+ cachedEvents.notifyAll();
+ }
+ }
+ }
+ }
+}
Propchange:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
Fri Feb 14 16:34:56 2014
@@ -18,6 +18,12 @@
*/
package org.apache.sling.replication.transport.impl;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
import org.apache.sling.replication.communication.ReplicationEndpoint;
@@ -27,8 +33,6 @@ import org.apache.sling.replication.tran
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
-import java.util.*;
-
public abstract class AbstractTransportHandlerFactory {
private ServiceRegistration serviceRegistration;
@@ -44,7 +48,6 @@ public abstract class AbstractTransportH
.toString(config.get(ReplicationAgentConfiguration.NAME),
String.valueOf(new Random().nextInt(1000)));
props.put(ReplicationAgentConfiguration.NAME, name);
-
Map<String, String> authenticationProperties =
PropertiesUtil.toMap(config.get(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES),
new String[0]);
props.put(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES,
authenticationProperties);
@@ -65,8 +68,8 @@ public abstract class AbstractTransportH
List<ReplicationEndpoint> replicationEndpoints = new
ArrayList<ReplicationEndpoint>();
- for(String endpoint : endpoints){
- if(endpoint != null && endpoint.length() > 0){
+ for (String endpoint : endpoints) {
+ if (endpoint != null && endpoint.length() > 0) {
replicationEndpoints.add(new
ReplicationEndpoint(endpoint));
}
}
@@ -88,7 +91,7 @@ public abstract class AbstractTransportH
}
protected void deactivate() {
- if(serviceRegistration != null){
+ if (serviceRegistration != null) {
serviceRegistration.unregister();
serviceRegistration = null;
}
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
Fri Feb 14 16:34:56 2014
@@ -44,7 +44,6 @@ public class HttpTransportHandlerFactory
private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" +
UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
-
@Property(boolValue = true)
private static final String ENABLED = "enabled";
@@ -54,7 +53,6 @@ public class HttpTransportHandlerFactory
@Property(cardinality = 1000)
private static final String ENDPOINT =
ReplicationAgentConfiguration.ENDPOINT;
-
@Property(options = {
@PropertyOption(name = "All",
value = "all endpoints"
@@ -76,7 +74,6 @@ public class HttpTransportHandlerFactory
@Property
private static final String AUTHENTICATION_PROPERTIES =
ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES;
-
@Property(boolValue = false)
private static final String USE_CUSTOM_HEADERS = "useCustomHeaders";
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
Fri Feb 14 16:34:56 2014
@@ -43,17 +43,14 @@ import java.util.Map;
public class PollingTransportHandlerFactory extends
AbstractTransportHandlerFactory {
static final String SERVICE_PID =
"org.apache.sling.replication.transport.impl.PollingTransportHandlerFactory";
-
private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" +
UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
-
@Property(boolValue = true)
private static final String ENABLED = "enabled";
@Property
private static final String NAME = "name";
-
@Property(cardinality = 1000)
private static final String ENDPOINT =
ReplicationAgentConfiguration.ENDPOINT;
@@ -71,7 +68,6 @@ public class PollingTransportHandlerFact
private ReplicationPackageImporter replicationPackageImporter;
-
protected TransportHandler createTransportHandler(Map<String, ?> config,
Dictionary<String,
Object> props,
TransportAuthenticationProvider transportAuthenticationProvider,
@@ -92,8 +88,6 @@ public class PollingTransportHandlerFact
return transportAuthenticationProviderFactory;
}
-
-
@Activate
protected void activate(BundleContext context, Map<String, ?> config)
throws Exception {
super.activate(context, config);
Modified:
sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
Fri Feb 14 16:34:56 2014
@@ -4,6 +4,6 @@
"TransportHandler.target" : "(name=http-publish-poll)",
"ReplicationPackageBuilder.target" : "(name=void)",
"ReplicationQueueProvider.target" : "(name=sjh)",
- "ReplicationQueueDistributionStrategy.target" : "(name=single)",
+ "ReplicationQueueDistributionStrategy.target" : "(name=error)",
"rules" : ["scheduled poll every 30 sec"]
}