This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new ec4934d SLING-12218 - Remove hard queue size limit. Introduce
adaptive thrott… (#135)
ec4934d is described below
commit ec4934d2d6eaf068cf40f73bf4018cf8b5d19ba2
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jan 9 16:35:50 2024 +0100
SLING-12218 - Remove hard queue size limit. Introduce adaptive thrott…
(#135)
* SLING-12218 - Remove hard queue size limit. Introduce adaptive throttling
* SLING-12218 - Start delay at queueSizeLimit, may delay at queueSizeLimit
* 2
* SLING-12218 - Remove old log
* SLING-12218 - Add feature toggle, increase default max queue size
* SLING-12218 - Improved logging+
---
pom.xml | 1 +
.../impl/publisher/DistributionPublisher.java | 65 ++++++++++++----------
.../impl/publisher/PublisherConfiguration.java | 4 +-
.../journal/shared/DistributionMetricsService.java | 7 ---
.../impl/publisher/DistributionPublisherTest.java | 46 ++++++++-------
5 files changed, 65 insertions(+), 58 deletions(-)
diff --git a/pom.xml b/pom.xml
index 99dc1ab..cbf02d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,6 +187,7 @@
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
+ <version>8.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 9dbf99b..0e8a7e5 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -59,10 +59,12 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicyOption;
+import org.osgi.service.condition.Condition;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
-
import org.apache.sling.distribution.journal.MessagingProvider;
/**
@@ -79,7 +81,7 @@ public class DistributionPublisher implements
DistributionAgent {
public static final String FACTORY_PID =
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
@Nonnull
- private final DefaultDistributionLog log;
+ private final DefaultDistributionLog distLog;
private final DistributionPackageBuilder packageBuilder;
@@ -95,17 +97,18 @@ public class DistributionPublisher implements
DistributionAgent {
private final String pkgType;
+ private final boolean limitEnabled;
+
private final long queuedTimeout;
private final int queueSizeLimit;
- private final int nearQueueSizeDelay;
+ private final int maxQueueSizeDelay;
private final Consumer<PackageMessage> sender;
private final DistributionLogEventListener distributionLogEventListener;
-
@Activate
public DistributionPublisher(
@Reference
@@ -124,6 +127,8 @@ public class DistributionPublisher implements
DistributionAgent {
DistributionMetricsService distributionMetricsService,
@Reference
PubQueueProvider pubQueueProvider,
+ @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)",
cardinality = ReferenceCardinality.OPTIONAL, policyOption =
ReferencePolicyOption.GREEDY)
+ Condition limitToggle,
PublisherConfiguration config,
BundleContext context) {
@@ -134,23 +139,23 @@ public class DistributionPublisher implements
DistributionAgent {
this.pubQueueProvider = pubQueueProvider;
pubAgentName = requireNotBlank(config.name());
- log = new DefaultDistributionLog(pubAgentName, this.getClass(),
DefaultDistributionLog.LogLevel.INFO);
- distributionLogEventListener = new
DistributionLogEventListener(context, log, pubAgentName);
+ distLog = new DefaultDistributionLog(pubAgentName, this.getClass(),
DefaultDistributionLog.LogLevel.INFO);
+ distributionLogEventListener = new
DistributionLogEventListener(context, distLog, pubAgentName);
+ limitEnabled = limitToggle != null;
queuedTimeout = config.queuedTimeout();
queueSizeLimit = config.queueSizeLimit();
- nearQueueSizeDelay = config.nearQueueSizeDelay();
+ maxQueueSizeDelay = config.maxQueueSizeDelay();
pkgType = packageBuilder.getType();
this.sender = messagingProvider.createSender(topics.getPackageTopic());
-
distributionMetricsService.createGauge(
DistributionMetricsService.PUB_COMPONENT +
".subscriber_count;pub_name=" + pubAgentName,
() ->
discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
);
- log.info("Started Publisher agent {} with packageBuilder {},
queuedTimeout {}",
- pubAgentName, pkgType, queuedTimeout);
+ distLog.info("Started Publisher agent={} with packageBuilder={},
limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
+ pubAgentName, pkgType, limitEnabled, queuedTimeout,
queueSizeLimit, maxQueueSizeDelay);
}
@Deactivate
@@ -158,7 +163,7 @@ public class DistributionPublisher implements
DistributionAgent {
IOUtils.closeQuietly(distributionLogEventListener);
String msg = format("Stopped Publisher agent %s with packageBuilder
%s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
- log.info(msg);
+ distLog.info(msg);
}
/**
@@ -188,7 +193,7 @@ public class DistributionPublisher implements
DistributionAgent {
@Nonnull
@Override
public DistributionLog getLog() {
- return log;
+ return distLog;
}
@Nonnull
@@ -204,26 +209,30 @@ public class DistributionPublisher implements
DistributionAgent {
throws DistributionException {
if (request.getRequestType() == PULL) {
String msg = "Request requestType=PULL not supported by this
agent";
- log.info(msg);
+ distLog.info(msg);
return new
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
}
- checkQueueSizeLimit();
+ int queueSize = pubQueueProvider.getMaxQueueSize(pubAgentName);
+ int sleepMs = getSleepTime(queueSize);
+ sleep(sleepMs);
final PackageMessage pkg = buildPackage(resourceResolver, request);
- return send(pkg);
+ return send(pkg, queueSize, sleepMs);
}
- private void checkQueueSizeLimit() throws DistributionException {
- int queueSize = pubQueueProvider.getMaxQueueSize(pubAgentName);
- if (queueSize > queueSizeLimit) {
- distributionMetricsService.getQueueSizeLimitReached().increment();
- String msg = String.format("Too many content distributions in
queue. maxSize=%d, size=%d", queueSizeLimit, queueSize);
- throw new DistributionException(msg);
- } else if (queueSize > queueSizeLimit - 10) {
- sleep(nearQueueSizeDelay);
+ int getSleepTime(int queueSize) {
+ if (!limitEnabled || queueSize <= queueSizeLimit) {
+ return 0;
+ } else if (queueSize >= queueSizeLimit*2) {
+ return maxQueueSizeDelay;
+ } else {
+ return (queueSize-queueSizeLimit) * maxQueueSizeDelay /
queueSizeLimit;
}
}
private void sleep(long sleepMs) throws DistributionException {
+ if (sleepMs <= 0) {
+ return;
+ }
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
@@ -243,24 +252,24 @@ public class DistributionPublisher implements
DistributionAgent {
distributionMetricsService.getDroppedRequests().mark();
String msg = format("Failed to create content package for
requestType=%s, paths=%s. Error=%s",
request.getRequestType(),
Arrays.toString(request.getPaths()), e.getMessage());
- log.error(msg, e);
+ distLog.error(msg, e);
throw new DistributionException(msg, e);
}
}
@Nonnull
- private DistributionResponse send(final PackageMessage pkg) throws
DistributionException {
+ private DistributionResponse send(final PackageMessage pkg, int queueSize,
int delayMS) throws DistributionException {
try {
long offset =
timed(distributionMetricsService.getEnqueuePackageDuration(), () ->
this.sendAndWait(pkg));
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
distributionMetricsService.getAcceptedRequests().mark();
- String msg = format("Request accepted with distribution package %s
at offset=%s", pkg, offset);
- log.info(msg);
+ String msg = format("Request accepted with distribution package %s
at offset=%d, queueSize=%d, queueSizeDelay=%d", pkg, offset, queueSize,
delayMS);
+ distLog.info(msg);
return new SimpleDistributionResponse(ACCEPTED, msg,
pkg::getPkgId);
} catch (Throwable e) {
distributionMetricsService.getDroppedRequests().mark();
String msg = format("Failed to append distribution package %s to
the journal", pkg);
- log.error(msg, e);
+ distLog.error(msg, e);
if (e instanceof Error) {
throw (Error) e;
} else {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
index 7fd5049..2573a9f 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublisherConfiguration.java
@@ -25,7 +25,7 @@ import
org.osgi.service.metatype.annotations.ObjectClassDefinition;
description = "Apache Sling Content Distribution Pub agent")
public @interface PublisherConfiguration {
- public static final int DEFAULT_QUEUE_SIZE_LIMIT = 300;
+ public static final int DEFAULT_QUEUE_SIZE_LIMIT = 1000;
@AttributeDefinition
String webconsole_configurationFactory_nameHint() default "Agent name:
{name}";
@@ -41,7 +41,7 @@ public @interface PublisherConfiguration {
description = "Timeout in ms to be used when waiting for a package
to be queued")
int queuedTimeout() default 60000;
- int nearQueueSizeDelay() default 20000;
+ int maxQueueSizeDelay() default 20000;
int queueSizeLimit() default DEFAULT_QUEUE_SIZE_LIMIT;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f6edb3f..0786f13 100644
---
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -98,8 +98,6 @@ public class DistributionMetricsService {
private final Counter permanentImportErrors;
- private final Counter queueSizeLimitReached;
-
@Activate
public DistributionMetricsService(@Reference MetricsService
metricsService) {
this.metricsService = metricsService;
@@ -111,7 +109,6 @@ public class DistributionMetricsService {
buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT,
"build_package_duration"));
enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT,
"enqueue_package_duration"));
queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT,
"queue_cache_fetch_count"));
- queueSizeLimitReached = getCounter(getMetricName(PUB_COMPONENT,
"queue_size_limit_reached"));
importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT,
"imported_package_size"));
itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT,
"items_buffer_size"));
importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT,
"imported_package_duration"));
@@ -419,8 +416,4 @@ public class DistributionMetricsService {
return permanentImportErrors;
}
- public Counter getQueueSizeLimitReached() {
- return queueSizeLimitReached;
- }
-
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 31dbd67..37028f4 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -68,7 +69,6 @@ import
org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -79,6 +79,7 @@ import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
+import org.osgi.service.condition.Condition;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
@@ -135,13 +136,13 @@ public class DistributionPublisherTest {
distributionMetricsService = new
DistributionMetricsService(metricsService);
when(packageBuilder.getType()).thenReturn("journal");
Map<String, String> props = Map.of("name", PUB1AGENT1,
- "nearQueueSizeDelay", "1000");
+ "maxQueueSizeDelay", "1000");
PublisherConfiguration config =
Converters.standardConverter().convert(props).to(PublisherConfiguration.class);
BundleContext bcontext = context.bundleContext();
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
publisher = new DistributionPublisher(messagingProvider,
packageBuilder, discoveryService, factory,
- eventAdmin, topics, distributionMetricsService,
pubQueueProvider, config, bcontext);
+ eventAdmin, topics, distributionMetricsService,
pubQueueProvider, Condition.INSTANCE, config, bcontext);
when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
}
@@ -180,29 +181,23 @@ public class DistributionPublisherTest {
}
@Test
- public void testQueueSizeLimitNear() throws IOException,
DistributionException {
- int queueSize = DEFAULT_QUEUE_SIZE_LIMIT - 1;
+ public void testQueueSizeLimitHalf() throws IOException,
DistributionException {
+ int queueSize = DEFAULT_QUEUE_SIZE_LIMIT + DEFAULT_QUEUE_SIZE_LIMIT /
2;
when(pubQueueProvider.getMaxQueueSize(PUB1AGENT1)).thenReturn(queueSize);
DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
- StopWatch stopwatch = new StopWatch();
- stopwatch.start();
- executeAndCheck(request);
- stopwatch.stop();
- long time = stopwatch.getTime(TimeUnit.MILLISECONDS);
- assertThat(time, greaterThanOrEqualTo(1000L));
+ long time = distribute(request);
+ assertThat(time, greaterThanOrEqualTo(500L));
+ assertThat(time, lessThanOrEqualTo(550L));
}
@Test
- public void testQueueSizeLimitReached() throws IOException,
DistributionException {
- int queueSize = DEFAULT_QUEUE_SIZE_LIMIT + 1;
+ public void testDoubleQueueSizeLimitReached() throws IOException,
DistributionException {
+ int queueSize = DEFAULT_QUEUE_SIZE_LIMIT * 2;
when(pubQueueProvider.getMaxQueueSize(PUB1AGENT1)).thenReturn(queueSize);
DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
- try {
- executeAndCheck(request);
- Assert.fail("Exception expected");
- } catch (DistributionException e) {
- assertThat(e.getMessage(), equalTo("Too many content distributions
in queue. maxSize=" + DEFAULT_QUEUE_SIZE_LIMIT + ", size=" + queueSize));
- }
+ long time = distribute(request);
+ assertThat(time, greaterThanOrEqualTo(1000L));
+ assertThat(time, lessThanOrEqualTo(1050L));
}
@SuppressWarnings("unchecked")
@@ -216,7 +211,7 @@ public class DistributionPublisherTest {
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
- containsString("Started Publisher agent pub1agent1"),
+ containsString("Started Publisher agent=pub1agent1"),
containsString("not supported by this agent")));
}
@@ -280,6 +275,15 @@ public class DistributionPublisherTest {
publisher.execute(resourceResolver, request);
}
+ private long distribute(DistributionRequest request) throws IOException,
DistributionException {
+ StopWatch stopwatch = new StopWatch();
+ stopwatch.start();
+ executeAndCheck(request);
+ stopwatch.stop();
+ long time = stopwatch.getTime(TimeUnit.MILLISECONDS);
+ return time;
+ }
+
@SuppressWarnings("unchecked")
private void executeAndCheck(DistributionRequest request) throws
IOException, DistributionException {
PackageMessage pkg = mockPackage(request);
@@ -298,7 +302,7 @@ public class DistributionPublisherTest {
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
- containsString("Started Publisher agent pub1agent1"),
+ containsString("Started Publisher agent=pub1agent1"),
containsString("Request accepted")));
}