This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-12704-merge in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit e6508f37fb3bc1ea6ed94b657416edc10f76ec41 Author: tmaret <[email protected]> AuthorDate: Fri Mar 14 15:36:03 2025 +0100 SLING-12704 - Introduce a JournalDistributionPackageBuilder --- pom.xml | 2 +- .../journal/bookkeeper/PackageHandler.java | 65 +++-- .../journal/shared/JournalDistributionPackage.java | 82 +++++++ .../shared/JournalDistributionPackageBuilder.java | 270 +++++++++++++++++++++ .../journal/impl/subscriber/SubscriberTest.java | 25 +- 5 files changed, 389 insertions(+), 55 deletions(-) diff --git a/pom.xml b/pom.xml index c630e9e..627ca83 100644 --- a/pom.xml +++ b/pom.xml @@ -143,7 +143,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.core</artifactId> - <version>0.4.8</version> + <version>0.7.2</version> <scope>provided</scope> <exclusions> <exclusion> diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java index 9fb79da..61a0540 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java @@ -19,14 +19,16 @@ package org.apache.sling.distribution.journal.bookkeeper; -import org.apache.commons.io.IOUtils; import org.apache.sling.api.resource.PersistenceException; -import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.BinaryStore; import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.shared.JournalDistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; +import org.apache.sling.distribution.packaging.DistributionPackageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +37,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import static java.lang.String.format; +import static org.apache.commons.io.IOUtils.toByteArray; +import static org.apache.sling.distribution.journal.messages.PackageMessage.ReqType.ADD; +import static org.apache.sling.distribution.packaging.DistributionPackageInfo.*; class PackageHandler { private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class); @@ -57,56 +61,43 @@ class PackageHandler { public void apply(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException, PersistenceException { - PackageMessage.ReqType type = pkgMsg.getReqType(); - switch (type) { - case ADD: - installAddPackage(resolver, pkgMsg); - break; - case DELETE: - installDeletePackage(resolver, pkgMsg); - break; - case TEST: - break; - default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type)); + DistributionPackage distributionPackage = toDistributionPackage(pkgMsg); + packageBuilder.installPackage(resolver, distributionPackage); + ReqType type = pkgMsg.getReqType(); + if (type == ADD) { + extractor.handle(resolver, pkgMsg.getPaths()); } } - private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg) + private DistributionPackage toDistributionPackage(PackageMessage pkgMsg) throws DistributionException { LOG.debug("Importing paths {}",pkgMsg.getPaths()); - InputStream pkgStream = null; - try { - pkgStream = stream(resolver, pkgMsg, binaryStore); - packageBuilder.installPackage(resolver, pkgStream); - extractor.handle(resolver, pkgMsg.getPaths()); - } finally { - IOUtils.closeQuietly(pkgStream); + final byte[] data; + try (InputStream inputStream = stream(pkgMsg)) { + data = toByteArray(inputStream); + } catch (IOException e) { + throw new DistributionException("Failed to download package from binary store", e); } - + DistributionPackageInfo distributionPackageInfo = new DistributionPackageInfo(pkgMsg.getPkgType()); + distributionPackageInfo.put(PROPERTY_REQUEST_PATHS, pkgMsg.getPaths().toArray()); + distributionPackageInfo.put(PROPERTY_REQUEST_DEEP_PATHS, pkgMsg.getDeepPaths().toArray()); + distributionPackageInfo.put(PROPERTY_REQUEST_TYPE, pkgMsg.getReqType()); + return new JournalDistributionPackage(pkgMsg.getPkgId(), pkgMsg.getPkgType(), data, distributionPackageInfo); } @Nonnull - public static InputStream stream(ResourceResolver resolver, PackageMessage pkgMsg, BinaryStore binaryStore) throws DistributionException { + InputStream stream(PackageMessage pkgMsg) throws DistributionException { if (pkgMsg.getPkgBinary() != null) { return new ByteArrayInputStream(pkgMsg.getPkgBinary()); - } else { - String pkgBinRef = pkgMsg.getPkgBinaryRef(); + } + String pkgBinRef = pkgMsg.getPkgBinaryRef(); + if (pkgBinRef != null) { try { return binaryStore.get(pkgBinRef); } catch (IOException io) { throw new DistributionException(io.getMessage(), io); } } - } - - private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg) - throws PersistenceException { - LOG.info("Deleting paths {}",pkgMsg.getPaths()); - for (String path : pkgMsg.getPaths()) { - Resource resource = resolver.getResource(path); - if (resource != null) { - resolver.delete(resource); - } - } + return new ByteArrayInputStream(new byte[0]); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackage.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackage.java new file mode 100644 index 0000000..c45ac16 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackage.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageInfo; + +import javax.annotation.Nonnull; +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +public class JournalDistributionPackage implements DistributionPackage { + + private final String id; + + private final String type; + + private final long size; + + private final byte[] data; + + private final DistributionPackageInfo info; + + public JournalDistributionPackage(@Nonnull String id, @Nonnull String type, @Nonnull byte[] data, @Nonnull DistributionPackageInfo info) { + this.id = id; + this.type = type; + this.size = data.length; + this.info = info; + this.data = data; + } + + @Nonnull + @Override + public String getId() { + return id; + } + + @Nonnull + @Override + public String getType() { + return type; + } + + @Override + public @Nonnull InputStream createInputStream() { + return new ByteArrayInputStream(data); + } + + @Override + public long getSize() { + return size; + } + + @Override + public void close() { + } + + @Override + public void delete() { + } + + @Override + public @Nonnull DistributionPackageInfo getInfo() { + return info; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackageBuilder.java new file mode 100644 index 0000000..b9c0c03 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalDistributionPackageBuilder.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import org.apache.jackrabbit.vault.fs.api.IdConflictPolicy; +import org.apache.jackrabbit.vault.fs.api.ImportMode; +import org.apache.jackrabbit.vault.fs.io.AccessControlHandling; +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.distribution.DistributionRequest; +import org.apache.sling.distribution.DistributionRequestType; +import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageBuilder; +import org.apache.sling.distribution.packaging.DistributionPackageInfo; +import org.apache.sling.distribution.serialization.*; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableMap; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toMap; +import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS; +import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS; +import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_TYPE; +import static org.slf4j.LoggerFactory.getLogger; + +@Component(service = DistributionPackageBuilder.class) +@Designate(ocd = JournalDistributionPackageBuilder.Configuration.class, factory = true) +public class JournalDistributionPackageBuilder implements DistributionPackageBuilder { + + private static final Logger LOG = getLogger(JournalDistributionPackageBuilder.class); + + private final String type; + + private final DistributionContentSerializer contentSerializer; + + @Activate + public JournalDistributionPackageBuilder( + Configuration config, + @Reference DistributionContentSerializerProvider serializerProvider) { + type = config.name(); + ExportSettings exportSettings = new ExportSettings( + config.package_roots(), + config.package_filters(), + config.property_filters(), + config.useBinaryReferences(), + pathMappings(config.pathsMapping()) + ); + ImportSettings importSettings = new ImportSettings( + ImportMode.valueOf(config.importMode()), + AccessControlHandling.valueOf(config.aclHandling()), + AccessControlHandling.valueOf(config.cugHandling()), + config.autoSaveThreshold(), + config.strictImport(), + config.overwritePrimaryTypesOfFolders(), + config.idConflictPolicy() + ); + contentSerializer = serializerProvider.build(config.name(), exportSettings, importSettings); + } + + @Override + public String getType() { + return type; + } + + @Override + public @Nonnull DistributionPackage createPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull DistributionRequest distributionRequest) + throws DistributionException { + + String packageId = format("dstrpck-%s-%s", currentTimeMillis(), randomUUID()); + + final byte[] data; + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + DistributionExportOptions distributionExportOptions = new DistributionExportOptions(distributionRequest, null /* Filters set on the serializer */); + contentSerializer.exportToStream(resourceResolver, distributionExportOptions, outputStream); + data = outputStream.toByteArray(); + } catch (IOException e) { + throw new DistributionException("Failed to create package for paths: " + Arrays.toString(distributionRequest.getPaths()), e); + } + + DistributionPackageInfo distributionPackageInfo = new DistributionPackageInfo(getType()); + distributionPackageInfo.put(PROPERTY_REQUEST_TYPE, distributionRequest.getRequestType()); + distributionPackageInfo.put(PROPERTY_REQUEST_PATHS, distributionRequest.getPaths()); + distributionPackageInfo.put(PROPERTY_REQUEST_DEEP_PATHS, getDeepPaths(distributionRequest)); + + return new JournalDistributionPackage(packageId, getType(), data, distributionPackageInfo); + } + + @Nonnull + @Override + public DistributionPackage readPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull InputStream inputStream) + throws DistributionException { + throw new DistributionException("Unsupported Operation"); + } + + @Nullable + @Override + public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull String id) + throws DistributionException { + throw new DistributionException("Unsupported Operation with id: " + id); + } + + @Override + public boolean installPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull DistributionPackage distributionPackage) + throws DistributionException { + DistributionPackageInfo info = distributionPackage.getInfo(); + DistributionRequestType requestType = requireNonNull(info.getRequestType()); + switch (requestType) { + case ADD: installAddPackage(resourceResolver, distributionPackage); break; + case DELETE: installDeletePackage(resourceResolver, distributionPackage); break; + default: LOG.debug("Skip request type: {}", requestType); + } + return true; + } + + @Override + public @Nonnull DistributionPackageInfo installPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull InputStream inputStream) + throws DistributionException { + throw new DistributionException("Unsupported Operation"); + } + + protected Map<String, String> pathMappings(String[] pathMappings) { + return unmodifiableMap(Arrays.stream(pathMappings) + .map(mapping -> mapping.split("=", 2)) + .filter(chunks -> chunks.length == 2) + .collect(toMap(chunks -> chunks[0], chunks -> chunks[1], (existing, replacement) -> replacement))); + } + + private void installAddPackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull DistributionPackage distributionPackage) + throws DistributionException { + try (InputStream inputStream = distributionPackage.createInputStream()) { + contentSerializer.importFromStream(resourceResolver, inputStream); + } catch (IOException e) { + throw new DistributionException("Failed to install distribution package with id: " + distributionPackage.getId(), e); + } + } + + private void installDeletePackage(@Nonnull ResourceResolver resourceResolver, + @Nonnull DistributionPackage distributionPackage) + throws DistributionException { + List<String> paths = asList(distributionPackage.getInfo().getPaths()); + LOG.info("Deleting paths {}", paths); + for (String path : paths) { + Resource resource = resourceResolver.getResource(path); + if (resource != null) { + try { + resourceResolver.delete(resource); + } catch (PersistenceException e) { + throw new DistributionException(e); + } + } + } + } + + private String[] getDeepPaths(DistributionRequest request) { + List<String> deepPaths = new ArrayList<>(); + for (String path : request.getPaths()) { + if (request.isDeep(path)) { + deepPaths.add(path); + } + } + return deepPaths.toArray(new String[0]); + } + + @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Builder Configuration", + description = "Apache Sling Content Distribution Package Builder Configuration") + public @interface Configuration { + + @AttributeDefinition + String webconsole_configurationFactory_nameHint() default "Builder name: {name}"; + + @AttributeDefinition(name = "Name", + description = "The name of the package builder.") + String name() default "journal-distribution"; + + @AttributeDefinition(name = "Import Mode", + description = "The vlt import mode for created packages.") + String importMode() default "REPLACE"; + + @AttributeDefinition(name = "Acl Handling", + description = "The vlt acl handling mode for created packages.") + String aclHandling() default "MERGE_PRESERVE"; + + @AttributeDefinition(name = "Cug Handling", + description = "The vlt cug handling mode for created packages.") + String cugHandling() default "OVERWRITE"; + + @AttributeDefinition(name = "Package Roots", + description = "The package roots to be used for created packages. (this is useful for assembling packages with an user that cannot read above the package root)") + String[] package_roots() default {}; + + @AttributeDefinition(name = "Package Node Filters", + description = "The package node path filters. Filter format: path|+include|-exclude") + String[] package_filters() default {}; + + @AttributeDefinition(name = "Package Property Filters", + description = "The package property path filters. Filter format: path|+include|-exclude") + String[] property_filters() default {}; + + @AttributeDefinition(name = "Use Binary References", + description = "If activated, it avoids sending binaries in the distribution package.") + boolean useBinaryReferences() default true; + + @AttributeDefinition(name = "Autosave threshold", + description = "The value after which autosave is triggered for intermediate changes.") + int autoSaveThreshold() default 1000; + + @AttributeDefinition(name = "Paths mapping", + description = "List of paths that require be mapped.The format is {sourcePattern}={destinationPattern}, e.g. /etc/(.*)=/var/$1/some or simply /data=/bak") + String[] pathsMapping() default {}; + + + @AttributeDefinition(name = "Install a content package in a strict mode", + description = "Flag to mark an error response will be thrown, if a content package will incorrectly installed") + boolean strictImport() default true; + + + @AttributeDefinition(name = "Legacy Folder Primary Type Mode", + description = "Whether to overwrite the primary type of folders during imports") + boolean overwritePrimaryTypesOfFolders() default true; + + @AttributeDefinition(name = "ID Conflict Policy", + description = "Node id conflict policy to be used during import") + IdConflictPolicy idConflictPolicy() default IdConflictPolicy.LEGACY; + + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 39b01ca..b16e379 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -35,7 +35,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.net.URI; @@ -62,12 +61,7 @@ import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; -import org.apache.sling.distribution.journal.HandlerAdapter; -import org.apache.sling.distribution.journal.MessageHandler; -import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessageSender; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.*; import org.apache.sling.distribution.journal.bookkeeper.BookKeeper; import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory; import org.apache.sling.distribution.journal.bookkeeper.LocalStore; @@ -84,8 +78,8 @@ import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Statu import org.apache.sling.distribution.journal.messages.PingMessage; import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.apache.sling.distribution.journal.shared.Topics; +import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; -import org.apache.sling.distribution.packaging.DistributionPackageInfo; import org.apache.sling.settings.SlingSettingsService; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.awaitility.Awaitility; @@ -101,7 +95,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.OngoingStubbing; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import org.osgi.service.event.EventAdmin; @@ -151,6 +144,9 @@ public class SubscriberTest { @Mock private SlingSettingsService slingSettings; + @Mock + private BinaryStore binaryStore; + @Spy private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); @@ -253,7 +249,7 @@ public class SubscriberTest { packageHandler.handle(info, message); verify(packageBuilder, timeout(1000).times(0)).installPackage(any(ResourceResolver.class), - any(ByteArrayInputStream.class)); + any(DistributionPackage.class)); assertThat(getStoredOffset(), nullValue()); for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) { packageHandler.handle(info, message); @@ -324,7 +320,7 @@ public class SubscriberTest { } @Test - public void testReceiveDelete() throws LoginException, PersistenceException { + public void testReceiveDelete() throws LoginException, IOException { assumeNoPrecondition(); initSubscriber(); waitSubscriber(IDLE); @@ -333,7 +329,6 @@ public class SubscriberTest { PackageMessage message = BASIC_DEL_PACKAGE; packageHandler.handle(info, message); waitSubscriber(IDLE); - await().atMost(30, SECONDS).until(() -> getResource("/test") == null); verifyNoStatusMessageSent(); } @@ -341,7 +336,7 @@ public class SubscriberTest { public void testSendFailedStatus() throws DistributionException { assumeNoPrecondition(); initSubscriber(Collections.singletonMap("maxRetries", "1")); - whenInstallPackage() + when(packageBuilder.installPackage(any(ResourceResolver.class), any(DistributionPackage.class))) .thenThrow(new RuntimeException("Expected")); MessageInfo info = createInfo(0l); @@ -416,10 +411,6 @@ public class SubscriberTest { return statusMessage; } - private OngoingStubbing<DistributionPackageInfo> whenInstallPackage() throws DistributionException { - return when(packageBuilder.installPackage(any(ResourceResolver.class), any(ByteArrayInputStream.class))); - } - private TestMessageInfo createInfo(long offset) { return new TestMessageInfo("", 1, offset, 0); }
