This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4934d  SLING-12218 - Remove hard queue size limit. Introduce 
adaptive thrott… (#135)
ec4934d is described below

commit ec4934d2d6eaf068cf40f73bf4018cf8b5d19ba2
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jan 9 16:35:50 2024 +0100

    SLING-12218 - Remove hard queue size limit. Introduce adaptive thrott… 
(#135)
    
    * SLING-12218 - Remove hard queue size limit. Introduce adaptive throttling
    
    * SLING-12218 - Start delay at queueSizeLimit, may delay at queueSizeLimit 
* 2
    
    * SLING-12218 - Remove old log
    
    * SLING-12218 - Add feature toggle, increase default max queue size
    
    * SLING-12218 - Improved logging+
---
 pom.xml                                            |  1 +
 .../impl/publisher/DistributionPublisher.java      | 65 ++++++++++++----------
 .../impl/publisher/PublisherConfiguration.java     |  4 +-
 .../journal/shared/DistributionMetricsService.java |  7 ---
 .../impl/publisher/DistributionPublisherTest.java  | 46 ++++++++-------
 5 files changed, 65 insertions(+), 58 deletions(-)

diff --git a/pom.xml b/pom.xml
index 99dc1ab..cbf02d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,6 +187,7 @@
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi.core</artifactId>
+            <version>8.0.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 9dbf99b..0e8a7e5 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -59,10 +59,12 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicyOption;
+import org.osgi.service.condition.Condition;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
-
 import org.apache.sling.distribution.journal.MessagingProvider;
 
 /**
@@ -79,7 +81,7 @@ public class DistributionPublisher implements 
DistributionAgent {
     public static final String FACTORY_PID = 
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
 
     @Nonnull
-    private final DefaultDistributionLog log;
+    private final DefaultDistributionLog distLog;
 
     private final DistributionPackageBuilder packageBuilder;
 
@@ -95,17 +97,18 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     private final String pkgType;
 
+    private final boolean limitEnabled;
+
     private final long queuedTimeout;
 
     private final int queueSizeLimit;
 
-    private final int nearQueueSizeDelay;
+    private final int maxQueueSizeDelay;
 
     private final Consumer<PackageMessage> sender;
 
     private final DistributionLogEventListener distributionLogEventListener;
 
-
     @Activate
     public DistributionPublisher(
             @Reference
@@ -124,6 +127,8 @@ public class DistributionPublisher implements 
DistributionAgent {
             DistributionMetricsService distributionMetricsService,
             @Reference
             PubQueueProvider pubQueueProvider,
+            @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", 
cardinality = ReferenceCardinality.OPTIONAL, policyOption = 
ReferencePolicyOption.GREEDY)
+            Condition limitToggle,
             PublisherConfiguration config,
             BundleContext context) {
 
@@ -134,23 +139,23 @@ public class DistributionPublisher implements 
DistributionAgent {
         this.pubQueueProvider = pubQueueProvider;
 
         pubAgentName = requireNotBlank(config.name());
-        log = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
-        distributionLogEventListener = new 
DistributionLogEventListener(context, log, pubAgentName);
+        distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
+        distributionLogEventListener = new 
DistributionLogEventListener(context, distLog, pubAgentName);
 
+        limitEnabled = limitToggle != null;
         queuedTimeout = config.queuedTimeout();
         queueSizeLimit = config.queueSizeLimit();
-        nearQueueSizeDelay = config.nearQueueSizeDelay();
+        maxQueueSizeDelay = config.maxQueueSizeDelay();
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
-        
         distributionMetricsService.createGauge(
                 DistributionMetricsService.PUB_COMPONENT + 
".subscriber_count;pub_name=" + pubAgentName,
                 () -> 
discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
         );
         
-        log.info("Started Publisher agent {} with packageBuilder {}, 
queuedTimeout {}",
-                pubAgentName, pkgType, queuedTimeout);
+        distLog.info("Started Publisher agent={} with packageBuilder={}, 
limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
+                pubAgentName, pkgType, limitEnabled, queuedTimeout, 
queueSizeLimit, maxQueueSizeDelay);
     }
 
     @Deactivate
@@ -158,7 +163,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         IOUtils.closeQuietly(distributionLogEventListener);
         String msg = format("Stopped Publisher agent %s with packageBuilder 
%s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
-        log.info(msg);
+        distLog.info(msg);
     }
     
     /**
@@ -188,7 +193,7 @@ public class DistributionPublisher implements 
DistributionAgent {
     @Nonnull
     @Override
     public DistributionLog getLog() {
-        return log;
+        return distLog;
     }
 
     @Nonnull
@@ -204,26 +209,30 @@ public class DistributionPublisher implements 
DistributionAgent {
             throws DistributionException {
         if (request.getRequestType() == PULL) {
             String msg = "Request requestType=PULL not supported by this 
agent";
-            log.info(msg);
+            distLog.info(msg);
             return new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
         }
-        checkQueueSizeLimit();
+        int queueSize = pubQueueProvider.getMaxQueueSize(pubAgentName);
+        int sleepMs = getSleepTime(queueSize);
+        sleep(sleepMs);
         final PackageMessage pkg = buildPackage(resourceResolver, request);
-        return send(pkg);
+        return send(pkg, queueSize, sleepMs);
     }
 
-    private void checkQueueSizeLimit() throws DistributionException {
-        int queueSize = pubQueueProvider.getMaxQueueSize(pubAgentName);
-        if (queueSize > queueSizeLimit) {
-            distributionMetricsService.getQueueSizeLimitReached().increment();
-            String msg = String.format("Too many content distributions in 
queue. maxSize=%d, size=%d", queueSizeLimit, queueSize);
-            throw new DistributionException(msg);
-        } else if (queueSize > queueSizeLimit - 10) {
-            sleep(nearQueueSizeDelay);
+    int getSleepTime(int queueSize) {
+        if (!limitEnabled || queueSize <= queueSizeLimit) {
+            return 0;
+        } else if (queueSize >= queueSizeLimit*2) {
+            return maxQueueSizeDelay;
+        } else {
+            return (queueSize-queueSizeLimit) * maxQueueSizeDelay / 
queueSizeLimit;
         }
     }
 
     private void sleep(long sleepMs) throws DistributionException {
+        if (sleepMs <= 0) {
+            return;
+        }
         try {
             Thread.sleep(sleepMs);
         } catch (InterruptedException e) {
@@ -243,24 +252,24 @@ public class DistributionPublisher implements 
DistributionAgent {
             distributionMetricsService.getDroppedRequests().mark();
             String msg = format("Failed to create content package for 
requestType=%s, paths=%s. Error=%s",
                     request.getRequestType(), 
Arrays.toString(request.getPaths()), e.getMessage());
-            log.error(msg, e);
+            distLog.error(msg, e);
             throw new DistributionException(msg, e);
         }
     }
     
     @Nonnull
-    private DistributionResponse send(final PackageMessage pkg) throws 
DistributionException {
+    private DistributionResponse send(final PackageMessage pkg, int queueSize, 
int delayMS) throws DistributionException {
         try {
             long offset = 
timed(distributionMetricsService.getEnqueuePackageDuration(), () -> 
this.sendAndWait(pkg));
             
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
             distributionMetricsService.getAcceptedRequests().mark();
-            String msg = format("Request accepted with distribution package %s 
at offset=%s", pkg, offset);
-            log.info(msg);
+            String msg = format("Request accepted with distribution package %s 
at offset=%d, queueSize=%d, queueSizeDelay=%d", pkg, offset, queueSize, 
delayMS);
+            distLog.info(msg);
             return new SimpleDistributionResponse(ACCEPTED, msg, 
pkg::getPkgId);
         } catch (Throwable e) {
             distributionMetricsService.getDroppedRequests().mark();
             String msg = format("Failed to append distribution package %s to 
the journal", pkg);
-            log.error(msg, e);
+            distLog.error(msg, e);
             if (e instanceof Error) {
                 throw (Error) e;
             } else {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
index 7fd5049..2573a9f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
@@ -25,7 +25,7 @@ import 
org.osgi.service.metatype.annotations.ObjectClassDefinition;
         description = "Apache Sling Content Distribution Pub agent")
 public @interface PublisherConfiguration {
 
-    public static final int DEFAULT_QUEUE_SIZE_LIMIT = 300;
+    public static final int DEFAULT_QUEUE_SIZE_LIMIT = 1000;
 
     @AttributeDefinition
     String webconsole_configurationFactory_nameHint() default "Agent name: 
{name}";
@@ -41,7 +41,7 @@ public @interface PublisherConfiguration {
             description = "Timeout in ms to be used when waiting for a package 
to be queued")
     int queuedTimeout() default 60000;
 
-    int nearQueueSizeDelay() default 20000;
+    int maxQueueSizeDelay() default 20000;
     
     int queueSizeLimit() default DEFAULT_QUEUE_SIZE_LIMIT;
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f6edb3f..0786f13 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -98,8 +98,6 @@ public class DistributionMetricsService {
 
     private final  Counter permanentImportErrors;
 
-    private final Counter queueSizeLimitReached;
-
     @Activate
     public DistributionMetricsService(@Reference MetricsService 
metricsService) {
         this.metricsService = metricsService;
@@ -111,7 +109,6 @@ public class DistributionMetricsService {
         buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"build_package_duration"));
         enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"enqueue_package_duration"));
         queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, 
"queue_cache_fetch_count"));
-        queueSizeLimitReached = getCounter(getMetricName(PUB_COMPONENT, 
"queue_size_limit_reached"));
         importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, 
"imported_package_size"));
         itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, 
"items_buffer_size"));
         importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, 
"imported_package_duration"));
@@ -419,8 +416,4 @@ public class DistributionMetricsService {
         return permanentImportErrors;
     }
 
-    public Counter getQueueSizeLimitReached() {
-        return queueSizeLimitReached;
-    }
-
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 31dbd67..37028f4 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -68,7 +69,6 @@ import 
org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -79,6 +79,7 @@ import org.mockito.Mockito;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
+import org.osgi.service.condition.Condition;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
@@ -135,13 +136,13 @@ public class DistributionPublisherTest {
         distributionMetricsService = new 
DistributionMetricsService(metricsService);
         when(packageBuilder.getType()).thenReturn("journal");
         Map<String, String> props = Map.of("name", PUB1AGENT1,
-                "nearQueueSizeDelay", "1000");
+                "maxQueueSizeDelay", "1000");
         PublisherConfiguration config = 
Converters.standardConverter().convert(props).to(PublisherConfiguration.class);
 
         BundleContext bcontext = context.bundleContext();
         
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, 
packageBuilder, discoveryService, factory,
-                eventAdmin, topics, distributionMetricsService, 
pubQueueProvider, config, bcontext);
+                eventAdmin, topics, distributionMetricsService, 
pubQueueProvider, Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
@@ -180,29 +181,23 @@ public class DistributionPublisherTest {
     }
     
     @Test
-    public void testQueueSizeLimitNear() throws IOException, 
DistributionException {
-        int queueSize = DEFAULT_QUEUE_SIZE_LIMIT - 1;
+    public void testQueueSizeLimitHalf() throws IOException, 
DistributionException {
+        int queueSize = DEFAULT_QUEUE_SIZE_LIMIT + DEFAULT_QUEUE_SIZE_LIMIT / 
2;
         
when(pubQueueProvider.getMaxQueueSize(PUB1AGENT1)).thenReturn(queueSize);
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
-        StopWatch stopwatch = new StopWatch();
-        stopwatch.start();
-        executeAndCheck(request);
-        stopwatch.stop();
-        long time = stopwatch.getTime(TimeUnit.MILLISECONDS);
-        assertThat(time, greaterThanOrEqualTo(1000L));
+        long time = distribute(request);
+        assertThat(time, greaterThanOrEqualTo(500L));
+        assertThat(time, lessThanOrEqualTo(550L));
     }
 
     @Test
-    public void testQueueSizeLimitReached() throws IOException, 
DistributionException {
-        int queueSize = DEFAULT_QUEUE_SIZE_LIMIT + 1;
+    public void testDoubleQueueSizeLimitReached() throws IOException, 
DistributionException {
+        int queueSize = DEFAULT_QUEUE_SIZE_LIMIT * 2;
         
when(pubQueueProvider.getMaxQueueSize(PUB1AGENT1)).thenReturn(queueSize);
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
-        try {
-            executeAndCheck(request);
-            Assert.fail("Exception expected");
-        } catch (DistributionException e) {
-            assertThat(e.getMessage(), equalTo("Too many content distributions 
in queue. maxSize=" + DEFAULT_QUEUE_SIZE_LIMIT + ", size=" + queueSize));
-        }
+        long time = distribute(request);
+        assertThat(time, greaterThanOrEqualTo(1000L));
+        assertThat(time, lessThanOrEqualTo(1050L));
     }
     
     @SuppressWarnings("unchecked")
@@ -216,7 +211,7 @@ public class DistributionPublisherTest {
 
         List<String> log = publisher.getLog().getLines();
         assertThat(log, contains(
-                containsString("Started Publisher agent pub1agent1"),
+                containsString("Started Publisher agent=pub1agent1"),
                 containsString("not supported by this agent")));
     }
     
@@ -280,6 +275,15 @@ public class DistributionPublisherTest {
         publisher.execute(resourceResolver, request);
     }
 
+    private long distribute(DistributionRequest request) throws IOException, 
DistributionException {
+        StopWatch stopwatch = new StopWatch();
+        stopwatch.start();
+        executeAndCheck(request);
+        stopwatch.stop();
+        long time = stopwatch.getTime(TimeUnit.MILLISECONDS);
+        return time;
+    }
+
     @SuppressWarnings("unchecked")
     private void executeAndCheck(DistributionRequest request) throws 
IOException, DistributionException {
         PackageMessage pkg = mockPackage(request);
@@ -298,7 +302,7 @@ public class DistributionPublisherTest {
         
         List<String> log = publisher.getLog().getLines();
         assertThat(log, contains(
-                containsString("Started Publisher agent pub1agent1"),
+                containsString("Started Publisher agent=pub1agent1"),
                 containsString("Request accepted")));
     }
 

Reply via email to