This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9481 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit c1a454010f2f55a317e7d4de5daa73890747b05c Author: Christian Schneider <[email protected]> AuthorDate: Thu May 28 17:07:12 2020 +0200 SLING-9481 - Avoid seeding messages in PackageRepo cleanup --- .../journal/impl/publisher/PackageCleaner.java | 72 +++++++++++++++++++ .../journal/impl/publisher/PackageCleanupTask.java | 4 +- .../journal/impl/publisher/PackageRepo.java | 81 +++------------------- .../journal/impl/publisher/PackageRepoTest.java | 66 ++++-------------- 4 files changed, 99 insertions(+), 124 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java new file mode 100644 index 0000000..9326e3b --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.publisher; + +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PackageCleaner { + private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class); + private ResourceResolver resolver; + private long deleteOlderThanTime; + + /** + * Delete all packages older than specified time + * + * @param resolver + * @param deleteOlderThanTime + */ + public PackageCleaner(ResourceResolver resolver, long deleteOlderThanTime) { + this.resolver = resolver; + this.deleteOlderThanTime = deleteOlderThanTime; + } + + public int cleanup(Resource root) + throws PersistenceException { + int removedCount = 0; + for (Resource type : root.getChildren()) { + Resource data = type.getChild("data"); + if (data != null) { + for (Resource pkgNode : data.getChildren()) { + removedCount += cleanNode(pkgNode); + } + } + } + if (resolver.hasChanges()) { + resolver.commit(); + } + return removedCount; + } + + private int cleanNode(Resource pkgNode) + throws PersistenceException { + long createdTime = pkgNode.getValueMap().get("jcr:created", Long.class); + if (createdTime < deleteOlderThanTime) { + LOG.info("removing package={}, created={} < deleteTime={}", pkgNode.getName(), createdTime, deleteOlderThanTime); + resolver.delete(pkgNode); + return 1; + } else { + return 0; + } + } + +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java index 0cce80b..554a9aa 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; public class PackageCleanupTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(PackageCleanupTask.class); + private static final long PKG_MAX_LIFETIME_MS = 30 * 24 * 60 * 60 * 1000; @Reference private PackageRepo packageRepo; @@ -65,7 +66,8 @@ public class PackageCleanupTask implements Runnable { @Override public void run() { LOG.info("Starting Package Cleanup Task"); - packageRepo.cleanup(); + long deleteOlderThanTime = System.currentTimeMillis() - PKG_MAX_LIFETIME_MS; + packageRepo.cleanup(deleteOlderThanTime); LOG.info("Finished Package Cleanup Task"); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java index aee0c52..2591270 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java @@ -26,11 +26,9 @@ import javax.jcr.Property; import javax.jcr.nodetype.NodeType; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.jackrabbit.api.ReferenceBinary; import org.apache.jackrabbit.commons.JcrUtils; import org.apache.sling.api.resource.LoginException; -import org.apache.sling.api.resource.ModifiableValueMap; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; @@ -45,9 +43,6 @@ import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; - import static java.util.Collections.singletonMap; import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; @@ -65,12 +60,6 @@ public class PackageRepo { private ResourceResolverFactory resolverFactory; @Reference - private Topics topics; - - @Reference - private MessagingProvider messagingProvider; - - @Reference private ServiceUserMapped mapped; @Reference @@ -80,6 +69,7 @@ public class PackageRepo { static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packages"; private static final String PACKAGE_PATH_PATTERN = PACKAGES_ROOT_PATH + "/%s/data/%s"; // packageType x packageId + @Nonnull public String store(ResourceResolver resolver, DistributionPackage disPkg) throws DistributionException { @@ -103,36 +93,18 @@ public class PackageRepo { } /** - * The cleanup algorithm is based on the head and tail offsets - * fetched from the package topic. - * - * The implementation is robust and fits any topic retention policy setting, - * without requiring any explicit configuration matching the - * actual retention policy. It is also robust against potential - * instance clock misconfiguration/synchronisation issues. - * - * This comes at the expense of saving offsets in the repository. - * It should be an acceptable cost, given that the vast majority - * of packages are not saved in the repository on the first place - * and thus don't need cleanup. - * - * The cleanup does this: - * - * * The packages with an offset smaller than the head - * offset can be removed because they are no longer referenced - * by the package topic. - * - * * The packages with no offset are set the tail offset. This relies - * on the session being shielded from new packages that might be created - * in parallel. + * Delete all packages that are older than specified unix time + * @param deleteOlderThanTime */ - public void cleanup() { + public void cleanup(long deleteOlderThanTime) { Timer.Context context = distributionMetricsService.getCleanupPackageDuration().time(); // Auto-refresh policy is disabled for service resource resolver try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"))) { - long headOffset = messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.earliest); - long tailOffset = messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.latest); - cleanup(resolver, headOffset, tailOffset); + + PackageCleaner packageCleaner = new PackageCleaner(resolver, deleteOlderThanTime); + Resource root = getRoot(resolver); + int removedCount = packageCleaner.cleanup(root); + distributionMetricsService.getCleanupPackageRemovedCount().increment(removedCount); } catch (LoginException | PersistenceException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -140,41 +112,6 @@ public class PackageRepo { } } - private void cleanup(ResourceResolver resolver, long headOffset, long tailOffset) - throws PersistenceException { - LOG.info("Cleanup headOffset {} tailOffset {}", headOffset, tailOffset); - Resource root = getRoot(resolver); - int removedCount = 0; - for (Resource type : root.getChildren()) { - Resource data = type.getChild("data"); - if (data != null) { - for (Resource pkg : data.getChildren()) { - removedCount += cleanNode(resolver, headOffset, tailOffset, pkg); - } - } - } - if (resolver.hasChanges()) { - resolver.commit(); - distributionMetricsService.getCleanupPackageRemovedCount().increment(removedCount); - } - } - - private int cleanNode(ResourceResolver resolver, long headOffset, long tailOffset, Resource pkg) - throws PersistenceException { - long offset = pkg.getValueMap().get("offset", -1); - if (offset < 0) { - LOG.info("keep package {}, setting tail offset {}", pkg.getName(), tailOffset); - pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset); - } else if (offset < headOffset) { - LOG.info("remove package {}, offset smaller than head offset {} < {}", pkg.getName(), offset, headOffset); - resolver.delete(pkg); - return 1; - } else { - LOG.debug("keep package {}, offset bigger or equal to head offset {} >= {}", pkg.getName(), offset, headOffset); - } - return 0; - } - @Nonnull private Resource getRoot(ResourceResolver resolver) throws PersistenceException { diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java index 161f454..43ed4f7 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java @@ -28,10 +28,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.function.Function; -import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; @@ -39,11 +36,14 @@ import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.metrics.Counter; import org.apache.sling.commons.metrics.Timer; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.impl.shared.Topics; +import org.apache.sling.distribution.journal.messages.Messages; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.testing.mock.osgi.MockOsgi; import org.apache.sling.testing.mock.sling.MockSling; import org.apache.sling.testing.mock.sling.ResourceResolverType; -import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,15 +51,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; import org.osgi.framework.BundleContext; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; - public class PackageRepoTest { @Spy @@ -103,45 +98,27 @@ public class PackageRepoTest { } @Test - public void testStoreClean() throws DistributionException, IOException, LoginException { - when(messagingProvider.retrieveOffset(Mockito.anyString(), Mockito.eq(Reset.earliest))) - .thenReturn(100L, 201L, 201L, 203L); - when(messagingProvider.retrieveOffset(Mockito.anyString(), Mockito.eq(Reset.latest))) - .thenReturn(200L, 202L, 202L, 203L); + public void testStoreClean() throws DistributionException, IOException, LoginException, InterruptedException { when(timer.time()) .thenReturn(context); when(distributionMetricsService.getCleanupPackageDuration()) .thenReturn(timer); when(distributionMetricsService.getCleanupPackageRemovedCount()) .thenReturn(counter); - - try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { - packageRepo.store(resolver, mockPackage()); - } - - assertNumNodes(1); - - // In a first pass we get current tail offset on all stored packages - packageRepo.cleanup(); - assertEachNode(this::getOffset, equalTo(200)); - - try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { - packageRepo.store(resolver, mockPackage()); - } - assertNumNodes(2); - - // We delete the old package and add the offset to the new one - packageRepo.cleanup(); + + long createTime = System.currentTimeMillis(); + store(mockPackage()); assertNumNodes(1); - - // As the new head offset is the same the node is not deleted - packageRepo.cleanup(); + packageRepo.cleanup(createTime - 1000); assertNumNodes(1); - - // Now the head offset is increased again and the node is deleted - packageRepo.cleanup(); + packageRepo.cleanup(createTime + 1000); assertNumNodes(0); + } + private void store(DistributionPackage pkg) throws DistributionException, IOException, LoginException { + try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { + packageRepo.store(resolver, pkg); + } } private void assertNumNodes(int num) throws LoginException { @@ -150,10 +127,6 @@ public class PackageRepoTest { } } - private Integer getOffset(Resource pkg) { - return pkg.getValueMap().get("offset", Integer.class); - } - private List<Resource> getPackageNodes(ResourceResolver resolver) throws LoginException { List<Resource> result = new ArrayList<>(); Resource root = resolver.getResource(PackageRepo.PACKAGES_ROOT_PATH); @@ -168,15 +141,6 @@ public class PackageRepoTest { return result; } - public <T> void assertEachNode(Function<Resource, T> func, Matcher<T> matcher) throws LoginException { - try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { - List<Resource> nodes = getPackageNodes(resolver); - for (Resource pkg : nodes) { - assertThat(func.apply(pkg), matcher); - } - } - } - private DistributionPackage mockPackage() throws IOException { DistributionPackage pkg = mock(DistributionPackage.class); when(pkg.getId()).thenReturn(UUID.randomUUID().toString());
