Author: bdelacretaz
Date: Wed Jan 8 11:24:44 2014
New Revision: 1556498
URL: http://svn.apache.org/r1556498
Log:
SLING-3298 - PollingTransportHandler should have a configurable no. of
consecutive polls - contributed by Tommaso Teofili, thanks!
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java?rev=1556498&r1=1556497&r2=1556498&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
(original)
+++
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
Wed Jan 8 11:24:44 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.replication.transport.impl;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
@@ -27,6 +28,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.fluent.Executor;
import org.apache.http.client.fluent.Request;
+import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.replication.communication.ReplicationEndpoint;
import org.apache.sling.replication.communication.ReplicationHeader;
import org.apache.sling.replication.serialization.ReplicationPackage;
@@ -35,24 +37,35 @@ import org.apache.sling.replication.tran
import org.apache.sling.replication.transport.TransportHandler;
import
org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
import
org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* basic HTTP GET {@link TransportHandler}
*/
-@Component(metatype = false)
+@Component(metatype = true)
@Service(value = TransportHandler.class)
-@Property(name = "name", value = PollingTransportHandler.NAME)
+@Property(name = "name", value = PollingTransportHandler.NAME, propertyPrivate
= true)
public class PollingTransportHandler implements TransportHandler {
public static final String NAME = "poll";
private final Logger log = LoggerFactory.getLogger(getClass());
+ @Property(name = "poll items", description = "number of subsequent poll
requests to make", intValue = -1)
+ private static final String POLL_ITEMS = "poll.items";
+
+ private int pollItems;
+
@Reference
private ReplicationPackageImporter replicationPackageImporter;
+ @Activate
+ protected void activate(ComponentContext context) {
+ pollItems =
PropertiesUtil.toInteger(context.getProperties().get(POLL_ITEMS), -1);
+ }
+
@SuppressWarnings("unchecked")
public void transport(ReplicationPackage replicationPackage,
ReplicationEndpoint replicationEndpoint,
@@ -72,14 +85,18 @@ public class PollingTransportHandler imp
Request req =
Request.Get(replicationEndpoint.getUri()).useExpectContinue();
// TODO : add queue header
+ int polls = pollItems;
+
// continuously requests package streams as long as type header is
received with the response (meaning there's a package of a certain type)
HttpResponse httpResponse;
- while ((httpResponse =
executor.execute(req).returnResponse()).containsHeader(ReplicationHeader.TYPE.toString()))
{
+ while ((httpResponse =
executor.execute(req).returnResponse()).containsHeader(ReplicationHeader.TYPE.toString())
+ && polls != 0) {
HttpEntity entity = httpResponse.getEntity();
Header typeHeader =
httpResponse.getFirstHeader(ReplicationHeader.TYPE.toString());
if (entity.getContentLength() > 0) {
replicationPackageImporter.scheduleImport(entity.getContent(),
typeHeader.getValue());
+ polls--;
if (log.isInfoEnabled()) {
log.info("scheduled import of package stream");
}
@@ -88,6 +105,7 @@ public class PollingTransportHandler imp
if (log.isInfoEnabled()) {
log.info("nothing to fetch");
}
+ break;
}
}
} catch (Exception e) {