This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9481
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit c1a454010f2f55a317e7d4de5daa73890747b05c
Author: Christian Schneider <[email protected]>
AuthorDate: Thu May 28 17:07:12 2020 +0200

    SLING-9481 - Avoid seeding messages in PackageRepo cleanup
---
 .../journal/impl/publisher/PackageCleaner.java     | 72 +++++++++++++++++++
 .../journal/impl/publisher/PackageCleanupTask.java |  4 +-
 .../journal/impl/publisher/PackageRepo.java        | 81 +++-------------------
 .../journal/impl/publisher/PackageRepoTest.java    | 66 ++++--------------
 4 files changed, 99 insertions(+), 124 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
new file mode 100644
index 0000000..9326e3b
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageCleaner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PackageRepo.class);
+    private ResourceResolver resolver;
+    private long deleteOlderThanTime;
+
+    /**
+     * Delete all packages older than specified time
+     * 
+     * @param resolver
+     * @param deleteOlderThanTime
+     */
+    public PackageCleaner(ResourceResolver resolver, long deleteOlderThanTime) 
{
+        this.resolver = resolver;
+        this.deleteOlderThanTime = deleteOlderThanTime;
+    }
+    
+    public int cleanup(Resource root)
+            throws PersistenceException {
+        int removedCount = 0;
+        for (Resource type : root.getChildren()) {
+            Resource data = type.getChild("data");
+            if (data != null) {
+                for (Resource pkgNode : data.getChildren()) {
+                    removedCount += cleanNode(pkgNode);
+                }
+            }
+        }
+        if (resolver.hasChanges()) {
+            resolver.commit();
+        }
+        return removedCount;
+    }
+    
+    private int cleanNode(Resource pkgNode)
+            throws PersistenceException {
+        long createdTime  = pkgNode.getValueMap().get("jcr:created", 
Long.class);
+        if (createdTime < deleteOlderThanTime) {
+            LOG.info("removing package={}, created={} < deleteTime={}", 
pkgNode.getName(), createdTime, deleteOlderThanTime);
+            resolver.delete(pkgNode);
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
index 0cce80b..554a9aa 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 public class PackageCleanupTask implements Runnable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PackageCleanupTask.class);
+    private static final long PKG_MAX_LIFETIME_MS = 30 * 24 * 60 * 60 * 1000;
 
     @Reference
     private PackageRepo packageRepo;
@@ -65,7 +66,8 @@ public class PackageCleanupTask implements Runnable {
     @Override
     public void run() {
         LOG.info("Starting Package Cleanup Task");
-        packageRepo.cleanup();
+        long deleteOlderThanTime = System.currentTimeMillis() - 
PKG_MAX_LIFETIME_MS;
+        packageRepo.cleanup(deleteOlderThanTime);
         LOG.info("Finished Package Cleanup Task");
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index aee0c52..2591270 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -26,11 +26,9 @@ import javax.jcr.Property;
 import javax.jcr.nodetype.NodeType;
 
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.jackrabbit.api.ReferenceBinary;
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -45,9 +43,6 @@ import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
@@ -65,12 +60,6 @@ public class PackageRepo {
     private ResourceResolverFactory resolverFactory;
     
     @Reference
-    private Topics topics;
-    
-    @Reference
-    private MessagingProvider messagingProvider;
-    
-    @Reference
     private ServiceUserMapped mapped;
 
     @Reference
@@ -80,6 +69,7 @@ public class PackageRepo {
     static final String PACKAGES_ROOT_PATH = 
"/var/sling/distribution/journal/packages";
     private static final String PACKAGE_PATH_PATTERN = PACKAGES_ROOT_PATH + 
"/%s/data/%s"; // packageType x packageId
 
+
     @Nonnull
     public String store(ResourceResolver resolver, DistributionPackage disPkg)
             throws DistributionException {
@@ -103,36 +93,18 @@ public class PackageRepo {
     }
 
     /**
-     * The cleanup algorithm is based on the head and tail offsets
-     * fetched from the package topic.
-     * 
-     * The implementation is robust and fits any topic retention policy 
setting,
-     * without requiring any explicit configuration matching the
-     * actual retention policy. It is also robust against potential
-     * instance clock misconfiguration/synchronisation issues.
-     * 
-     * This comes at the expense of saving offsets in the repository.
-     * It should be an acceptable cost, given that the vast majority
-     * of packages are not saved in the repository on the first place
-     * and thus don't need cleanup.
-     *
-     * The cleanup does this:
-     *
-     * * The packages with an offset smaller than the head
-     *   offset can be removed because they are no longer referenced
-     *   by the package topic. 
-     * 
-     * * The packages with no offset are set the tail offset. This relies
-     *   on the session being shielded from new packages that might be created
-     *   in parallel.
+     * Delete all packages that are older than specified unix time
+     * @param deleteOlderThanTime 
      */
-    public void cleanup() {
+    public void cleanup(long deleteOlderThanTime) {
         Timer.Context context = 
distributionMetricsService.getCleanupPackageDuration().time();
         // Auto-refresh policy is disabled for service resource resolver
         try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, 
"bookkeeper"))) {
-            long headOffset = 
messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.earliest);
-            long tailOffset = 
messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.latest);
-            cleanup(resolver, headOffset, tailOffset);
+            
+            PackageCleaner packageCleaner = new PackageCleaner(resolver, 
deleteOlderThanTime);
+            Resource root = getRoot(resolver);
+            int removedCount = packageCleaner.cleanup(root);
+            
distributionMetricsService.getCleanupPackageRemovedCount().increment(removedCount);
         } catch (LoginException | PersistenceException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -140,41 +112,6 @@ public class PackageRepo {
         }
     }
 
-    private void cleanup(ResourceResolver resolver, long headOffset, long 
tailOffset)
-            throws PersistenceException {
-        LOG.info("Cleanup headOffset {} tailOffset {}", headOffset, 
tailOffset);
-        Resource root = getRoot(resolver);
-        int removedCount = 0;
-        for (Resource type : root.getChildren()) {
-            Resource data = type.getChild("data");
-            if (data != null) {
-                for (Resource pkg : data.getChildren()) {
-                    removedCount += cleanNode(resolver, headOffset, 
tailOffset, pkg);
-                }
-            }
-        }
-        if (resolver.hasChanges()) {
-            resolver.commit();
-            
distributionMetricsService.getCleanupPackageRemovedCount().increment(removedCount);
-        }
-    }
-
-    private int cleanNode(ResourceResolver resolver, long headOffset, long 
tailOffset, Resource pkg)
-            throws PersistenceException {
-        long offset = pkg.getValueMap().get("offset", -1);
-        if (offset < 0) {
-            LOG.info("keep package {}, setting tail offset {}", pkg.getName(), 
tailOffset);
-            pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset);
-        } else if (offset < headOffset) {
-            LOG.info("remove package {}, offset smaller than head offset {} < 
{}", pkg.getName(), offset, headOffset);
-            resolver.delete(pkg);
-            return 1;
-        } else {
-            LOG.debug("keep package {}, offset bigger or equal to head offset 
{} >= {}", pkg.getName(), offset, headOffset);
-        }
-        return 0;
-    }
-
     @Nonnull
     private Resource getRoot(ResourceResolver resolver)
             throws PersistenceException {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index 161f454..43ed4f7 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -28,10 +28,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Function;
 
-import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -39,11 +36,14 @@ import 
org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.sling.MockSling;
 import org.apache.sling.testing.mock.sling.ResourceResolverType;
-import org.hamcrest.Matcher;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,15 +51,10 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.osgi.framework.BundleContext;
 
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-
 public class PackageRepoTest {
     
     @Spy
@@ -103,45 +98,27 @@ public class PackageRepoTest {
     }
     
     @Test
-    public void testStoreClean() throws DistributionException, IOException, 
LoginException {
-        when(messagingProvider.retrieveOffset(Mockito.anyString(), 
Mockito.eq(Reset.earliest)))
-            .thenReturn(100L, 201L, 201L, 203L);
-        when(messagingProvider.retrieveOffset(Mockito.anyString(), 
Mockito.eq(Reset.latest)))
-            .thenReturn(200L, 202L, 202L, 203L);
+    public void testStoreClean() throws DistributionException, IOException, 
LoginException, InterruptedException {
         when(timer.time())
                 .thenReturn(context);
         when(distributionMetricsService.getCleanupPackageDuration())
                 .thenReturn(timer);
         when(distributionMetricsService.getCleanupPackageRemovedCount())
                 .thenReturn(counter);
-        
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            packageRepo.store(resolver, mockPackage());
-        }
-        
-        assertNumNodes(1);
-        
-        // In a first pass we get current tail offset on all stored packages
-        packageRepo.cleanup();
-        assertEachNode(this::getOffset, equalTo(200));
-        
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            packageRepo.store(resolver, mockPackage());
-        }
-        assertNumNodes(2);
-        
-        // We delete the old package and add the offset to the new one
-        packageRepo.cleanup();
+
+        long createTime = System.currentTimeMillis();
+        store(mockPackage());
         assertNumNodes(1);
-        
-        // As the new head offset is the same the node is not deleted
-        packageRepo.cleanup();
+        packageRepo.cleanup(createTime - 1000);
         assertNumNodes(1);
-        
-        // Now the head offset is increased again and the node is deleted
-        packageRepo.cleanup();
+        packageRepo.cleanup(createTime + 1000);
         assertNumNodes(0);
+    }
 
+    private void store(DistributionPackage pkg) throws DistributionException, 
IOException, LoginException {
+        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            packageRepo.store(resolver, pkg);
+        }
     }
 
     private void assertNumNodes(int num) throws LoginException {
@@ -150,10 +127,6 @@ public class PackageRepoTest {
         }
     }
 
-    private Integer getOffset(Resource pkg) {
-        return pkg.getValueMap().get("offset", Integer.class);
-    }
-
     private List<Resource> getPackageNodes(ResourceResolver resolver) throws 
LoginException {
         List<Resource> result = new ArrayList<>();
         Resource root = resolver.getResource(PackageRepo.PACKAGES_ROOT_PATH);
@@ -168,15 +141,6 @@ public class PackageRepoTest {
         return result;
     }
     
-    public <T> void assertEachNode(Function<Resource, T> func, Matcher<T> 
matcher) throws LoginException {
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            List<Resource> nodes = getPackageNodes(resolver);
-            for (Resource pkg : nodes) {
-                assertThat(func.apply(pkg), matcher);
-            }
-        }
-    }
-
     private DistributionPackage mockPackage() throws IOException {
         DistributionPackage pkg = mock(DistributionPackage.class);
         when(pkg.getId()).thenReturn(UUID.randomUUID().toString());

Reply via email to