This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9593 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit ea470c9063e9e9db61b1977dc1f685e273288b5e Author: Christian Schneider <[email protected]> AuthorDate: Wed Jul 29 15:58:20 2020 +0200 GRANITE-30450 - Move binary store, require config --- .../jcr/JcrBinaryStore.java} | 97 +++++++++++++------ .../publisher => binary/jcr}/PackageCleaner.java | 5 +- .../jcr}/PackageCleanupTask.java | 6 +- .../impl/publisher/PackageMessageFactory.java | 5 +- .../journal/shared/JcrBinaryStore.java | 103 --------------------- .../jcr/JcrBinaryStoreTest.java} | 9 +- 6 files changed, 82 insertions(+), 143 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java similarity index 68% rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java index 6f2791b..532d083 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java +++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java @@ -16,17 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.publisher; +package org.apache.sling.distribution.journal.binary.jcr; + +import static java.util.Collections.singletonMap; +import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; import javax.annotation.Nonnull; -import javax.annotation.ParametersAreNonnullByDefault; import javax.jcr.Binary; import javax.jcr.Node; import javax.jcr.Property; +import javax.jcr.Session; +import javax.jcr.ValueFactory; import javax.jcr.nodetype.NodeType; import org.apache.jackrabbit.api.ReferenceBinary; import org.apache.jackrabbit.commons.JcrUtils; +import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.Resource; @@ -35,41 +44,77 @@ import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.api.resource.ResourceUtil; import org.apache.sling.commons.metrics.Timer; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.BinaryStore; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.serviceusermapping.ServiceUserMapped; import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.util.Collections.singletonMap; -import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; - -import java.io.InputStream; -import java.util.Objects; - -/** - * Manages the binary content of DistributionPackages. If they are too big to fit in a journal message then they - * are written to the blob store. It also offers cleanup functionality to remove the data when it is not needed anymore. - */ -@Component(service = PackageRepo.class) -@ParametersAreNonnullByDefault -public class PackageRepo { - +@Component( + property = { + "type=jcr" + }, + configurationPolicy = ConfigurationPolicy.REQUIRE +) +public class JcrBinaryStore implements BinaryStore { + private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024; private static final String SLING_FOLDER = "sling:Folder"; + static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries"; + + private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class); - @Reference - private ResourceResolverFactory resolverFactory; - @Reference private ServiceUserMapped mapped; @Reference private DistributionMetricsService distributionMetricsService; - private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class); - static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries"; + @Reference + private ResourceResolverFactory resolverFactory; + + @Override public InputStream get(String reference) throws IOException { + try (ResourceResolver resolver = createResourceResolver()) { + Session session = resolver.adaptTo(Session.class); + if (session == null) { + throw new IOException("Unable to get Oak session"); + } + ValueFactory factory = session.getValueFactory(); + Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary(); + return binary.getStream(); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public String put(String id, InputStream stream, long length) throws IOException { + if (length > MAX_INLINE_PKG_BINARY_SIZE) { + + /* + * Rather than pro-actively (and somewhat arbitrarily) + * decide to avoid sending a package inline based on + * its size, we could simply try to send packages of + * any size and only avoiding to inline as a fallback. + * However, this approach requires the messaging + * implementation to offer a mean to distinguish + * size issues when sending messages, which is not + * always the case. + */ + + LOG.info("Package {} too large ({}B) to be sent inline", id, length); + try { + return store(id, stream); + } catch (DistributionException e) { + throw new IOException(e.getMessage(), e); + } + } + return null; + } + @Nonnull public String store(String id, InputStream binaryStream)throws DistributionException { try (ResourceResolver resolver = createResourceResolver()) { @@ -110,14 +155,14 @@ public class PackageRepo { context.stop(); } } - - private ResourceResolver createResourceResolver() throws LoginException { - return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper")); - } - + @Nonnull private Resource getRoot(ResourceResolver resolver) throws PersistenceException { return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true); } + + private ResourceResolver createResourceResolver() throws LoginException { + return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper")); + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java similarity index 96% rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java index d272904..c706f8c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java +++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.publisher; +package org.apache.sling.distribution.journal.binary.jcr; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.Resource; @@ -25,7 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PackageCleaner { - private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class); + private static final Logger LOG = LoggerFactory.getLogger(PackageCleaner.class); + private ResourceResolver resolver; private long deleteOlderThanTime; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java similarity index 95% rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java index b534195..56c5b1f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java +++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.publisher; +package org.apache.sling.distribution.journal.binary.jcr; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE; @@ -55,7 +55,7 @@ public class PackageCleanupTask implements Runnable { private static final long PKG_MAX_LIFETIME_MS = 30 * 24 * 60 * 60 * 1000; @Reference - private PackageRepo packageRepo; + private JcrBinaryStore binaryStore; /** * The task runs only when at least one DistributionSubscriber agent is configured. @@ -67,7 +67,7 @@ public class PackageCleanupTask implements Runnable { public void run() { LOG.info("Starting Package Cleanup Task"); long deleteOlderThanTime = System.currentTimeMillis() - PKG_MAX_LIFETIME_MS; - packageRepo.cleanup(deleteOlderThanTime); + binaryStore.cleanup(deleteOlderThanTime); LOG.info("Finished Package Cleanup Task"); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java index ce69e3c..0ee6e65 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java @@ -46,7 +46,6 @@ import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; -import org.osgi.service.component.annotations.ReferencePolicyOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,12 +55,10 @@ public class PackageMessageFactory { private static final Logger LOG = LoggerFactory.getLogger(PackageMessageFactory.class); - private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024; - @Reference private SlingSettingsService slingSettings; - @Reference(policyOption = ReferencePolicyOption.GREEDY) + @Reference private BinaryStore binaryStore; private String pubSlingId; diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java b/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java deleted file mode 100644 index 20c3d86..0000000 --- a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.shared; - -import java.io.IOException; -import java.io.InputStream; - -import javax.jcr.Binary; -import javax.jcr.Session; -import javax.jcr.ValueFactory; - -import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary; -import org.apache.sling.api.resource.LoginException; -import org.apache.sling.api.resource.ResourceResolver; -import org.apache.sling.api.resource.ResourceResolverFactory; -import org.apache.sling.distribution.common.DistributionException; -import org.apache.sling.distribution.journal.BinaryStore; -import org.apache.sling.distribution.journal.impl.publisher.PackageRepo; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Collections.singletonMap; -import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; - -@Component( - property = { - "type=jcr", - "service.ranking:Integer=10" - } -) -public class JcrBinaryStore implements BinaryStore { - - private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class); - - private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024; - - @Reference - private PackageRepo packageRepo; - - @Reference - private ResourceResolverFactory resolverFactory; - - @Override public InputStream get(String reference) throws IOException { - try (ResourceResolver resolver = createResourceResolver()) { - Session session = resolver.adaptTo(Session.class); - if (session == null) { - throw new IOException("Unable to get Oak session"); - } - ValueFactory factory = session.getValueFactory(); - Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary(); - return binary.getStream(); - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - } - - @Override - public String put(String id, InputStream stream, long length) throws IOException { - if (length > MAX_INLINE_PKG_BINARY_SIZE) { - - /* - * Rather than pro-actively (and somewhat arbitrarily) - * decide to avoid sending a package inline based on - * its size, we could simply try to send packages of - * any size and only avoiding to inline as a fallback. - * However, this approach requires the messaging - * implementation to offer a mean to distinguish - * size issues when sending messages, which is not - * always the case. - */ - - LOG.info("Package {} too large ({}B) to be sent inline", id, length); - try { - return packageRepo.store(id, stream); - } catch (DistributionException e) { - throw new IOException(e.getMessage(), e); - } - } - return null; - } - - private ResourceResolver createResourceResolver() throws LoginException { - return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper")); - } -} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java similarity index 95% rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java rename to src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java index c6200c8..4e83c32 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.publisher; +package org.apache.sling.distribution.journal.binary.jcr; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -54,7 +54,7 @@ import org.mockito.MockitoAnnotations; import org.mockito.Spy; import org.osgi.framework.BundleContext; -public class PackageRepoTest { +public class JcrBinaryStoreTest { @Spy private BundleContext bundleContext = MockOsgi.newBundleContext(); @@ -84,8 +84,7 @@ public class PackageRepoTest { private Topics topics = new Topics(); @InjectMocks - private PackageRepo packageRepo; - + private JcrBinaryStore packageRepo; @Before public void before() { @@ -125,7 +124,7 @@ public class PackageRepoTest { private List<Resource> getPackageNodes(ResourceResolver resolver) throws LoginException { List<Resource> result = new ArrayList<>(); - Resource root = resolver.getResource(PackageRepo.PACKAGES_ROOT_PATH); + Resource root = resolver.getResource(JcrBinaryStore.PACKAGES_ROOT_PATH); for (Resource pkg : root.getChildren()) { result.add(pkg); }
