This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-13021-2 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit f9043ae52a907199b65d98a1d5adf23fb98ea37e Author: Joerg Hoh <[email protected]> AuthorDate: Tue Feb 10 18:04:37 2026 +0100 SLING-13021 add a dedicated unit test for concurrent package import --- .../DistributionSubscriberConcurrentTest.java | 294 +++++++++++++++++++++ 1 file changed, 294 insertions(+) diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java new file mode 100644 index 0000000..5075749 --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.subscriber; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.commons.metrics.MetricsService; +import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessor; +import org.apache.sling.distribution.journal.BinaryStore; +import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.bookkeeper.BookKeeper; +import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory; +import org.apache.sling.distribution.agent.DistributionAgentState; +import org.apache.sling.distribution.journal.bookkeeper.LocalStore; +import org.apache.sling.distribution.journal.impl.precondition.Precondition; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor; +import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor; +import org.apache.sling.distribution.journal.shared.OnlyOnLeader; +import org.apache.sling.distribution.journal.shared.TestMessageInfo; +import org.apache.sling.distribution.journal.shared.Topics; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageBuilder; +import org.apache.sling.settings.SlingSettingsService; +import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.osgi.framework.BundleContext; +import org.osgi.service.event.EventAdmin; +import org.osgi.util.converter.Converters; + +/** + * Tests concurrent import of packages by DistributionSubscriber with multiple importer threads. + * Uses explicit synchronization (CountDownLatch) so the test does not depend on timing. + */ +@RunWith(MockitoJUnitRunner.class) +public class DistributionSubscriberConcurrentTest { + + private static final String SUB_SLING_ID = "subsling"; + private static final String SUB_AGENT_NAME = "subagent"; + private static final String PUB_AGENT_NAME = "pubagent"; + private static final String STORE_PACKAGE_NODE_NAME = "myserver.apache.org_somepath_package"; + private static final int NUM_MESSAGES = 4; + /** Use at least 2 and enough threads so all messages can enter the handler (block in mock) at once. */ + private static final int IMPORT_THREADS = 4; + private static final long AWAIT_SECONDS = 30; + + @Mock + private BundleContext context; + @Mock + private DistributionPackageBuilder packageBuilder; + @Mock + private Precondition precondition; + @Mock + private SlingSettingsService slingSettings; + @Mock + private BinaryStore binaryStore; + @Spy + private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); + @Mock + private MessagingProvider clientProvider; + @Mock + private EventAdmin eventAdmin; + @Mock + private org.apache.sling.distribution.journal.MessageSender<DiscoveryMessage> discoverySender; + @Mock + private org.apache.sling.distribution.journal.MessageSender<PackageStatusMessage> statusSender; + @Spy + private MetricsService metricsService = MetricsService.NOOP; + @Spy + private ImportPreProcessor importPreProcessor = new NoOpImportPreProcessor(); + @Spy + private ImportPostProcessor importPostProcessor = new NoOpImportPostProcessor(); + @Spy + private SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore(); + @InjectMocks + private BookKeeperFactory bookKeeperFactory; + + private DistributionSubscriber subscriber; + private MessageHandler<PackageMessage> packageHandler; + + @Captor + private ArgumentCaptor<HandlerAdapter<PackageMessage>> packageCaptor; + + @Before + public void before() throws URISyntaxException { + when(packageBuilder.getType()).thenReturn("journal"); + when(slingSettings.getSlingId()).thenReturn(SUB_SLING_ID); + URI serverURI = new URI("http://myserver.apache.org:1234/somepath"); + when(clientProvider.getServerUri()).thenReturn(serverURI); + when(clientProvider.<PackageStatusMessage>createSender(Topics.STATUS_TOPIC)).thenReturn(statusSender); + when(clientProvider.<DiscoveryMessage>createSender(Topics.DISCOVERY_TOPIC)).thenReturn(discoverySender); + } + + @After + public void after() throws IOException { + if (subscriber != null) { + subscriber.deactivate(); + } + } + + /** + * Submits multiple package messages (ADD, ADD, DELETE, ADD) with at least 2 importer threads. + * Uses latches so that: (1) we know when all messages have entered the handler, + * (2) we release them in a chosen order (2, 0, 3, 1) to validate that the stored offset + * only advances when all lower offsets have completed (no timing assumptions). + */ + @Test + public void testConcurrentImportMultiplePackageTypes() throws Exception { + assumePreconditionAccept(); + initSubscriberWithConcurrentThreads(IMPORT_THREADS); + + CountDownLatch allEntered = new CountDownLatch(NUM_MESSAGES); + CountDownLatch[] startLatches = new CountDownLatch[NUM_MESSAGES]; + CountDownLatch[] doneLatches = new CountDownLatch[NUM_MESSAGES]; + for (int i = 0; i < NUM_MESSAGES; i++) { + startLatches[i] = new CountDownLatch(1); + doneLatches[i] = new CountDownLatch(1); + } + + doAnswer((Answer<Void>) invocation -> { + DistributionPackage pkg = invocation.getArgument(1); + String id = pkg.getId(); + int idx = Integer.parseInt(id.replace("pkg-", "")); + allEntered.countDown(); + startLatches[idx].await(AWAIT_SECONDS, SECONDS); + doneLatches[idx].countDown(); + return null; + }).when(packageBuilder).installPackage(any(ResourceResolver.class), any(DistributionPackage.class)); + + PackageMessage add0 = packageMessage("pkg-0", ReqType.ADD); + PackageMessage add1 = packageMessage("pkg-1", ReqType.ADD); + PackageMessage del2 = packageMessage("pkg-2", ReqType.DELETE); + PackageMessage add3 = packageMessage("pkg-3", ReqType.ADD); + + MessageInfo info0 = createInfo(0); + MessageInfo info1 = createInfo(1); + MessageInfo info2 = createInfo(2); + MessageInfo info3 = createInfo(3); + + // packageHandler is the handler the subscriber registered with the poller; calling it + // invokes the subscriber's delegatePackageMessageToExecutor -> import executor -> handlePackageMessage path + packageHandler.handle(info0, add0); + packageHandler.handle(info1, add1); + packageHandler.handle(info2, del2); + packageHandler.handle(info3, add3); + + assertThat("All messages should enter installPackage", allEntered.await(AWAIT_SECONDS, SECONDS)); + + assertThat(getStoredOffset(), equalTo(-1L)); + + startLatches[2].countDown(); + assertThat("Offset 2 done", doneLatches[2].await(AWAIT_SECONDS, SECONDS)); + assertThat("Offset 2 completed first; stored offset must not advance (0,1 still in flight)", getStoredOffset(), equalTo(-1L)); + + startLatches[0].countDown(); + assertThat("Offset 0 done", doneLatches[0].await(AWAIT_SECONDS, SECONDS)); + assertThat("Offset 0 completed; now safe to store 0", getStoredOffset(), equalTo(0L)); + + startLatches[3].countDown(); + assertThat("Offset 3 done", doneLatches[3].await(AWAIT_SECONDS, SECONDS)); + assertThat("Offset 3 completed; 1 still in flight, stored offset stays 0", getStoredOffset(), equalTo(0L)); + + startLatches[1].countDown(); + assertThat("Offset 1 done", doneLatches[1].await(AWAIT_SECONDS, SECONDS)); + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> getStoredOffset() == 1L); + assertThat("Offset 1 completed last; stored offset advances to 1", getStoredOffset(), equalTo(1L)); + + waitSubscriberIdle(); + assertThat("Subscriber should be IDLE after all concurrent imports complete", subscriber.getState(), equalTo(IDLE)); + + verify(packageBuilder, times(NUM_MESSAGES)).installPackage(any(ResourceResolver.class), any(DistributionPackage.class)); + } + + private void assumePreconditionAccept() { + try { + when(precondition.canProcess(Mockito.eq(SUB_AGENT_NAME), Mockito.anyLong())).thenReturn(Decision.ACCEPT); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initSubscriberWithConcurrentThreads(int threads) { + Map<String, Object> props = new HashMap<>(); + props.put("name", SUB_AGENT_NAME); + props.put("agentNames", PUB_AGENT_NAME); + props.put("idleMillies", 1000); + props.put("subscriberIdleCheck", true); + props.put("concurrentImporterThreads", threads); + + SubscriberConfiguration config = Converters.standardConverter().convert(props).to(SubscriberConfiguration.class); + OnlyOnLeader onlyOnLeader = new OnlyOnLeader(context); + + subscriber = new DistributionSubscriber( + packageBuilder, + slingSettings, + clientProvider, + precondition, + metricsService, + bookKeeperFactory, + subscriberReadyStore, + onlyOnLeader, + config, + context, + props); + + verify(clientProvider).createPoller( + Mockito.eq(Topics.PACKAGE_TOPIC), + Mockito.eq(Reset.latest), + Mockito.nullable(String.class), + packageCaptor.capture(), + Mockito.any()); + packageHandler = packageCaptor.getValue().getHandler(); + } + + private void waitSubscriberIdle() { + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> subscriber.getState() == IDLE); + } + + private static PackageMessage packageMessage(String pkgId, ReqType reqType) { + return PackageMessage.builder() + .pkgId(pkgId) + .pubAgentName(PUB_AGENT_NAME) + .reqType(reqType) + .pkgType("journal") + .paths(Arrays.asList("/test")) + .pkgBinary(new byte[1]) + .build(); + } + + private static MessageInfo createInfo(long offset) { + return new TestMessageInfo("", 0, offset, System.currentTimeMillis()); + } + + private long getStoredOffset() { + LocalStore store = new LocalStore(resolverFactory, STORE_PACKAGE_NODE_NAME, SUB_AGENT_NAME); + Long value = store.load(BookKeeper.KEY_OFFSET, Long.class); + return value != null ? value : -1L; + } +}
