This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9504 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit e98649265825b5dffa50347e4f2162ef8ebd2c7b Author: Christian Schneider <[email protected]> AuthorDate: Wed Jun 17 10:51:34 2020 +0200 SLING-9504 - Migrate to json --- pom.xml | 4 +- .../journal/impl/event/DistributionEvent.java | 12 ++--- .../impl/precondition/PackageStatusWatcher.java | 9 ++-- .../impl/precondition/StagingPrecondition.java | 2 +- .../journal/impl/publisher/DiscoveryService.java | 17 +++--- .../impl/publisher/DistributionPublisher.java | 16 ++---- .../impl/publisher/PackageDistributedNotifier.java | 8 +-- .../impl/publisher/PackageMessageFactory.java | 61 +++++++++++----------- .../journal/impl/queue/QueueItemFactory.java | 10 ++-- .../journal/impl/queue/impl/PubQueueCache.java | 7 +-- .../impl/queue/impl/PubQueueProviderImpl.java | 40 +++++++------- .../journal/impl/queue/impl/QueueCacheSeeder.java | 18 +++---- .../journal/impl/queue/impl/RangePoller.java | 8 +-- .../journal/impl/shared/LimitPoller.java | 8 +-- .../journal/impl/shared/PackageBrowser.java | 6 +-- .../journal/impl/shared/PackageViewerPlugin.java | 6 +-- .../journal/impl/subscriber/Announcer.java | 42 ++++++++------- .../journal/impl/subscriber/BookKeeper.java | 31 +++++------ .../journal/impl/subscriber/CommandPoller.java | 13 ++--- .../impl/subscriber/DistributionSubscriber.java | 20 +++---- .../journal/impl/subscriber/PackageHandler.java | 10 ++-- .../precondition/PackageStatusWatcherTest.java | 17 +++--- .../impl/precondition/StagingPreconditionTest.java | 14 ++--- .../impl/publisher/DiscoveryServiceTest.java | 28 +++++----- .../publisher/DistributionPackageFactoryTest.java | 23 ++++---- .../impl/publisher/DistributionPublisherTest.java | 33 ++++++------ .../impl/publisher/PackageQueuedNotifierTest.java | 18 ++++--- .../journal/impl/publisher/PackageRepoTest.java | 4 +- .../journal/impl/queue/QueueItemFactoryTest.java | 20 +++---- .../journal/impl/queue/impl/PubQueueCacheTest.java | 35 ++++++------- .../impl/queue/impl/PubQueueProviderTest.java | 46 ++++++++-------- .../impl/queue/impl/QueueCacheSeederTest.java | 12 ++--- .../journal/impl/queue/impl/RangePollerTest.java | 24 ++++----- .../journal/impl/shared/LimitPollerTest.java | 25 +++++---- .../journal/impl/shared/PackageBrowserTest.java | 17 +++--- .../impl/shared/PackageViewerPluginTest.java | 21 ++++---- .../journal/impl/shared/TestMessageInfo.java | 7 +++ .../journal/impl/subscriber/AnnouncerTest.java | 11 ++-- .../journal/impl/subscriber/BookKeeperTest.java | 2 +- .../journal/impl/subscriber/CommandPollerTest.java | 31 ++++------- .../journal/impl/subscriber/SubscriberTest.java | 54 +++++++++---------- 41 files changed, 380 insertions(+), 410 deletions(-) diff --git a/pom.xml b/pom.xml index 8642120..5ff6d7c 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <!-- P R O J E C T --> <!-- ======================================================================= --> <artifactId>org.apache.sling.distribution.journal</artifactId> - <version>0.1.15-SNAPSHOT</version> + <version>0.2.0-JSON-SNAPSHOT</version> <name>Apache Sling Journal based Content Distribution - Core bundle</name> <description>Implementation of Apache Sling Content Distribution components on top of an append-only persisted log</description> @@ -140,7 +140,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal.messages</artifactId> - <version>0.1.2</version> + <version>0.2.0-JSON-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.felix</groupId> diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java index c153ad3..a22e724 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java @@ -35,12 +35,10 @@ import java.util.Map; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.sling.distribution.journal.messages.Messages; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.osgi.service.event.Event; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; - @ParametersAreNonnullByDefault public class DistributionEvent { @@ -51,11 +49,11 @@ public class DistributionEvent { private DistributionEvent() { } - public static Event eventImporterImported(Messages.PackageMessage pkgMsg, String agentName) { + public static Event eventImporterImported(PackageMessage pkgMsg, String agentName) { return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName, pkgMsg); } - public static Event eventPackageCreated(Messages.PackageMessage pkgMsg, String agentName) { + public static Event eventPackageCreated(PackageMessage pkgMsg, String agentName) { return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName, pkgMsg); } @@ -66,12 +64,12 @@ public class DistributionEvent { queueItem.getPackageId()); } - public static Event eventPackageQueued(Messages.PackageMessage pkgMsg, String agentName) { + public static Event eventPackageQueued(PackageMessage pkgMsg, String agentName) { return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg); } private static Event buildEvent(String topic, String kind, String agentName, PackageMessage pkgMsg) { - List<String> pathsList = pkgMsg.getPathsList(); + List<String> pathsList = pkgMsg.getPaths(); return buildEvent(topic, kind, agentName, pkgMsg.getReqType().name(), pathsList.toArray(new String[0]), diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java index 0408e0d..d359f44 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java @@ -30,9 +30,8 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; public class PackageStatusWatcher implements Closeable { private final Closeable poller; @@ -46,7 +45,7 @@ public class PackageStatusWatcher implements Closeable { poller = messagingProvider.createPoller( topicName, Reset.earliest, - create(Messages.PackageStatusMessage.class, this::handle) + create(PackageStatusMessage.class, this::handle) ); } @@ -73,7 +72,7 @@ public class PackageStatusWatcher implements Closeable { poller.close(); } - private void handle(MessageInfo info, Messages.PackageStatusMessage pkgStatusMsg) { + private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) { // TODO: check revision Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName()); agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus()); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java index d523cdf..9b55848 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java index 496ff81..d2c4972 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java @@ -19,9 +19,9 @@ package org.apache.sling.distribution.journal.impl.publisher; import static java.lang.String.format; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD; +import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; import java.util.Dictionary; @@ -31,8 +31,9 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.apache.commons.io.IOUtils; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; @@ -41,7 +42,6 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; @@ -105,7 +105,8 @@ public class DiscoveryService implements Runnable { poller = messagingProvider.createPoller( topics.getDiscoveryTopic(), Reset.latest, - create(Messages.DiscoveryMessage.class, new DiscoveryMessageHandler())); + create(DiscoveryMessage.class, new DiscoveryMessageHandler()) + ); startTopologyViewUpdaterTask(context); LOG.info("Discovery service started"); } @@ -158,9 +159,9 @@ public class DiscoveryService implements Runnable { long now = System.currentTimeMillis(); AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName()); - for (Messages.SubscriberState subStateMsg : disMsg.getSubscriberStateList()) { - SubscriberConfiguration subConfig = disMsg.getSubscriberConfiguration(); - State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable()); + for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) { + SubscriberConfig subConfig = disMsg.getSubscriberConfiguration(); + State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable()); viewManager.refreshState(subState); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index d366ab3..6ac1e4b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -52,6 +52,7 @@ import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse; import org.apache.sling.distribution.journal.impl.shared.Topics; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.commons.lang3.StringUtils; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.DistributionRequest; @@ -75,8 +76,6 @@ import org.osgi.service.event.EventAdmin; import org.osgi.service.metatype.annotations.Designate; import org.apache.sling.distribution.journal.impl.shared.JMXRegistration; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.JournalAvailable; @@ -136,7 +135,7 @@ public class DistributionPublisher implements DistributionAgent { private ServiceRegistration<DistributionAgent> componentReg; - private MessageSender<PackageMessage> sender; + private Consumer<PackageMessage> sender; private JMXRegistration reg; @@ -146,7 +145,7 @@ public class DistributionPublisher implements DistributionAgent { log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO); REQ_TYPES.put(ADD, this::sendAndWait); REQ_TYPES.put(DELETE, this::sendAndWait); - REQ_TYPES.put(TEST, this::send); + REQ_TYPES.put(TEST, this.sender); } @Activate @@ -159,7 +158,7 @@ public class DistributionPublisher implements DistributionAgent { pkgType = packageBuilder.getType(); - this.sender = messagingProvider.createSender(); + this.sender = messagingProvider.createSender(topics.getPackageTopic()); Dictionary<String, Object> props = createServiceProps(config); componentReg = requireNonNull(context.registerService(DistributionAgent.class, this, props)); @@ -320,7 +319,7 @@ public class DistributionPublisher implements DistributionAgent { CompletableFuture<Void> received = queuedNotifier.registerWait(pkg.getPkgId()); Event createdEvent = DistributionEvent.eventPackageCreated(pkg, pubAgentName); eventAdmin.postEvent(createdEvent); - send(pkg); + sender.accept(pkg); received.get(queuedTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { queuedNotifier.unRegisterWait(pkg.getPkgId()); @@ -328,11 +327,6 @@ public class DistributionPublisher implements DistributionAgent { } } - private void send(PackageMessage pipePackage) { - final String topicName = topics.getPackageTopic(); - sender.send(topicName, pipePackage); - } - @Nonnull private DistributionResponse executeUnsupported(DistributionRequest request) { String msg = String.format("Request type %s is not supported by this agent, expected one of %s", diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java index 75326e8..c579089 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java @@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.publisher; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.LongStream; @@ -31,7 +32,6 @@ import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.PackageDistributedMessage; -import org.apache.sling.distribution.journal.JsonMessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.commons.lang3.StringUtils; @@ -65,7 +65,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { @Reference private Topics topics; - private JsonMessageSender<PackageDistributedMessage> sender; + private Consumer<PackageDistributedMessage> sender; private boolean sendMsg; @@ -73,7 +73,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { public void activate() { sendMsg = StringUtils.isNotBlank(topics.getEventTopic()); if (sendMsg) { - sender = messagingProvider.createJsonSender(); + sender = messagingProvider.createSender(topics.getEventTopic()); } LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic()); } @@ -111,7 +111,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS); msg.deepPaths = (String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS); - sender.send(topics.getEventTopic(), msg); + sender.accept(msg); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java index 2e7d65e..28bf70a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java @@ -33,6 +33,9 @@ import org.apache.commons.io.IOUtils; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.PackageMessageBuilder; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.packaging.DistributionPackageInfo; @@ -44,10 +47,6 @@ import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; -import com.google.protobuf.ByteString; - @Component(service = PackageMessageFactory.class) @ParametersAreNonnullByDefault public class PackageMessageFactory { @@ -106,16 +105,16 @@ public class PackageMessageFactory { final List<String> paths = Arrays.asList(pkgInfo.getPaths()); final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class)); final String pkgId = disPkg.getId(); - PackageMessage.Builder pkgBuilder = PackageMessage.newBuilder() - .setPubSlingId(pubSlingId) - .setPkgId(pkgId) - .setPubAgentName(pubAgentName) - .addAllPaths(paths) - .setReqType(ReqType.ADD) - .addAllDeepPaths(deepPaths) - .setPkgLength(pkgLength) - .setUserId(resourceResolver.getUserID()) - .setPkgType(packageBuilder.getType()); + PackageMessageBuilder pkgBuilder = PackageMessage.builder() + .pubSlingId(pubSlingId) + .pkgId(pkgId) + .pubAgentName(pubAgentName) + .paths(paths) + .reqType(ReqType.ADD) + .deepPaths(deepPaths) + .pkgLength(pkgLength) + .userId(resourceResolver.getUserID()) + .pkgType(packageBuilder.getType()); if (pkgLength > MAX_INLINE_PKG_BINARY_SIZE) { /* @@ -131,9 +130,9 @@ public class PackageMessageFactory { LOG.info("Package {} too large ({}B) to be sent inline", disPkg.getId(), pkgLength); String pkgBinRef = packageRepo.store(resourceResolver, disPkg); - pkgBuilder.setPkgBinaryRef(pkgBinRef); + pkgBuilder.pkgBinaryRef(pkgBinRef); } else { - pkgBuilder.setPkgBinary(ByteString.copyFrom(pkgBinary)); + pkgBuilder.pkgBinary(pkgBinary); } PackageMessage pipePackage = pkgBuilder.build(); disPkg.delete(); @@ -143,27 +142,27 @@ public class PackageMessageFactory { @Nonnull private PackageMessage createDelete(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, String pubAgentName) { String pkgId = UUID.randomUUID().toString(); - return PackageMessage.newBuilder() - .setPubSlingId(pubSlingId) - .setPkgId(pkgId) - .setPubAgentName(pubAgentName) - .addAllPaths(Arrays.asList(request.getPaths())) - .setReqType(ReqType.DELETE) - .setPkgType(packageBuilder.getType()) - .setUserId(resourceResolver.getUserID()) + return PackageMessage.builder() + .pubSlingId(pubSlingId) + .pkgId(pkgId) + .pubAgentName(pubAgentName) + .paths(Arrays.asList(request.getPaths())) + .reqType(ReqType.DELETE) + .pkgType(packageBuilder.getType()) + .userId(resourceResolver.getUserID()) .build(); } @Nonnull public PackageMessage createTest(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, String pubAgentName) { String pkgId = UUID.randomUUID().toString(); - return PackageMessage.newBuilder() - .setPubSlingId(pubSlingId) - .setPubAgentName(pubAgentName) - .setPkgId(pkgId) - .setReqType(ReqType.TEST) - .setPkgType(packageBuilder.getType()) - .setUserId(resourceResolver.getUserID()) + return PackageMessage.builder() + .pubSlingId(pubSlingId) + .pubAgentName(pubAgentName) + .pkgId(pkgId) + .reqType(ReqType.TEST) + .pkgType(packageBuilder.getType()) + .userId(resourceResolver.getUserID()) .build(); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java index c9808d8..855a757 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java @@ -33,10 +33,10 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; @ParametersAreNonnullByDefault public final class QueueItemFactory { @@ -74,14 +74,14 @@ public final class QueueItemFactory { properties.put(RECORD_TIMESTAMP, info.getCreateTime()); properties.put(PROPERTY_PACKAGE_TYPE, message.getPkgType()); properties.put(PROPERTY_REQUEST_TYPE, toDistReqType(message.getReqType())); - String[] paths = toArray(message.getPathsList()); + String[] paths = toArray(message.getPaths()); properties.put(PROPERTY_REQUEST_PATHS, paths); - String[] deepPaths = toArray(message.getDeepPathsList()); + String[] deepPaths = toArray(message.getDeepPaths()); properties.put(PROPERTY_REQUEST_DEEP_PATHS, deepPaths); if (addMessage) { properties.put(PACKAGE_MSG, message); } - if (message.hasUserId()) { + if (message.getUserId() != null) { properties.put(REQUEST_USER_ID, message.getUserId()); } return new DistributionQueueItem(packageId, pkgLength, properties); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index ec811d2..6cf0be7 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -19,10 +19,10 @@ package org.apache.sling.distribution.journal.impl.queue.impl; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.groupingBy; +import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; import java.util.HashSet; @@ -43,6 +43,7 @@ import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ import org.apache.sling.distribution.journal.impl.shared.JMXRegistration; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.impl.subscriber.LocalStore; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; @@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; @@ -158,7 +158,8 @@ public class PubQueueCache { this.topic, Reset.earliest, assignTo, - create(PackageMessage.class, this::handlePackage)); + create(PackageMessage.class, this::handlePackage) + ); } @Nonnull diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java index 3b7feab..c9428b6 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java @@ -18,28 +18,26 @@ */ package org.apache.sling.distribution.journal.impl.queue.impl; -import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED; import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; -import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.CommandMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; +import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; - -import org.apache.commons.io.IOUtils; +import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; +import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; +import org.apache.sling.distribution.journal.impl.shared.Topics; +import org.apache.sling.distribution.journal.messages.ClearCommand; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.osgi.service.component.annotations.Activate; @@ -74,7 +72,7 @@ public class PubQueueProviderImpl implements PubQueueProvider { private Closeable statusPoller; - private MessageSender<CommandMessage> sender; + private Consumer<ClearCommand> sender; public PubQueueProviderImpl() { } @@ -93,8 +91,9 @@ public class PubQueueProviderImpl implements PubQueueProvider { statusPoller = messagingProvider.createPoller( topics.getStatusTopic(), Reset.earliest, - create(PackageStatusMessage.class, this::handleStatus)); - sender = messagingProvider.createSender(); + create(PackageStatusMessage.class, this::handleStatus) + ); + sender = messagingProvider.createSender(topics.getCommandTopic()); LOG.info("Started Publisher queue provider service"); } @@ -131,7 +130,7 @@ public class PubQueueProviderImpl implements PubQueueProvider { } public void handleStatus(MessageInfo info, PackageStatusMessage message) { - if (message.getStatus() == REMOVED_FAILED) { + if (message.getStatus() == Status.REMOVED_FAILED) { String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName()); OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>()); errorQueue.putItem(info.getOffset(), message.getOffset()); @@ -144,16 +143,13 @@ public class PubQueueProviderImpl implements PubQueueProvider { } private void sendClearCommand(String subSlingId, String subAgentName, long offset) { - Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder() - .setOffset(offset) - .build(); - CommandMessage commandMessage = CommandMessage.newBuilder() - .setSubSlingId(subSlingId) - .setSubAgentName(subAgentName) - .setClearCommand(clearCommand) + ClearCommand commandMessage = ClearCommand.builder() + .subSlingId(subSlingId) + .subAgentName(subAgentName) + .offset(offset) .build(); LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset); - sender.send(topics.getCommandTopic(), commandMessage); + sender.accept(commandMessage); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java index 79f6063..e3ac96c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java @@ -27,7 +27,7 @@ import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +79,7 @@ public class QueueCacheSeeder implements Closeable { private void sendSeedingMessages() { LOG.info("Start message seeder"); try { - MessageSender<PackageMessage> sender = provider.createSender(); + MessageSender<PackageMessage> sender = provider.createSender(topic); while (!closed) { sendSeedingMessage(sender); delay(CACHE_SEEDING_DELAY_MS); @@ -90,14 +90,14 @@ public class QueueCacheSeeder implements Closeable { } private void sendSeedingMessage() { - sendSeedingMessage(provider.createSender()); + sendSeedingMessage(provider.createSender(topic)); } private void sendSeedingMessage(MessageSender<PackageMessage> sender) { PackageMessage pkgMsg = createTestMessage(); LOG.info("Send seeding message"); try { - sender.send(topic, pkgMsg); + sender.send(pkgMsg); } catch (MessagingException e) { LOG.warn(e.getMessage(), e); delay(CACHE_SEEDING_DELAY_MS * 10); @@ -114,11 +114,11 @@ public class QueueCacheSeeder implements Closeable { protected PackageMessage createTestMessage() { String pkgId = UUID.randomUUID().toString(); - return PackageMessage.newBuilder() - .setPubSlingId("seeder") - .setPkgId(pkgId) - .setPkgType("seeder") - .setReqType(PackageMessage.ReqType.TEST) + return PackageMessage.builder() + .pubSlingId("seeder") + .pkgId(pkgId) + .pkgType("seeder") + .reqType(PackageMessage.ReqType.TEST) .build(); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java index 68fe9f7..72e7c4a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java @@ -31,12 +31,11 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.messages.PackageMessage; @ParametersAreNonnullByDefault public class RangePoller { @@ -64,7 +63,8 @@ public class RangePoller { LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset); headPoller = messagingProvider.createPoller( packageTopic, Reset.earliest, assign, - create(Messages.PackageMessage.class, this::handlePackage)); + create(PackageMessage.class, this::handlePackage) + ); } public List<FullMessage<PackageMessage>> fetchRange() throws InterruptedException { @@ -77,7 +77,7 @@ public class RangePoller { } } - private void handlePackage(MessageInfo info, Messages.PackageMessage message) { + private void handlePackage(MessageInfo info, PackageMessage message) { long offset = info.getOffset(); LOG.debug("Reading offset {}", offset); if (offset < maxOffset) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java index 2bb30b5..687c9ab 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java @@ -36,8 +36,7 @@ import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +65,8 @@ public class LimitPoller { log.info("Fetching {} messages starting from {}", maxMessages, minOffset); headPoller = messagingProvider.createPoller( packageTopic, Reset.earliest, assign, - create(Messages.PackageMessage.class, this::handlePackage)); + create(PackageMessage.class, this::handlePackage) + ); } public List<FullMessage<PackageMessage>> fetch(Duration timeOut) { @@ -86,7 +86,7 @@ public class LimitPoller { } } - private void handlePackage(MessageInfo info, Messages.PackageMessage message) { + private void handlePackage(MessageInfo info, PackageMessage message) { long offset = info.getOffset(); log.debug("Reading offset {}", offset); if (this.messages.size() < maxMessages && info.getOffset() >= minOffset) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java index b9d1d2d..b02d33a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java @@ -43,7 +43,7 @@ import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.JournalAvailable; import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; @@ -78,8 +78,8 @@ public class PackageBrowser { @Nonnull public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException { - if (pkgMsg.hasPkgBinary()) { - return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray()); + if (pkgMsg.getPkgBinary() != null) { + return new ByteArrayInputStream(pkgMsg.getPkgBinary()); } else { String pkgBinRef = pkgMsg.getPkgBinaryRef(); try { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java index 30f8836..308cd61 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java @@ -31,8 +31,8 @@ import javax.servlet.http.HttpServletResponse; import org.apache.felix.webconsole.AbstractWebConsolePlugin; import org.apache.felix.webconsole.WebConsoleConstants; import org.apache.sling.distribution.journal.FullMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.osgi.framework.Constants; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; @@ -109,7 +109,7 @@ public class PackageViewerPlugin extends AbstractWebConsolePlugin { msg.getMessage().getPkgId(), msg.getInfo().getOffset(), msg.getMessage().getReqType(), - msg.getMessage().getPathsList().toString()); + msg.getMessage().getPaths().toString()); } private void writePackage(Long offset, HttpServletResponse res) throws IOException { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java index 7ecd03b..06059e2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java @@ -19,19 +19,20 @@ package org.apache.sling.distribution.journal.impl.subscriber; import java.io.Closeable; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,27 +89,28 @@ class Announcer implements Runnable, Closeable { private DiscoveryMessage createDiscoveryMessage() { long offset = bookKeeper.loadOffset(); - SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder() - .setEditable(editable) - .setMaxRetries(maxRetries) + SubscriberConfig subscriberConfiguration = SubscriberConfig.builder() + .editable(editable) + .maxRetries(maxRetries) + .build(); + List<SubscriberState> states = pubAgentNames.stream() + .map(pubAgentName -> subscriberState(pubAgentName, offset)) + .collect(Collectors.toList()); + return DiscoveryMessage + .builder() + .subSlingId(subSlingId) + .subAgentName(subAgentName) + .subscriberConfiguration(subscriberConfiguration) + .subscriberStates(states) .build(); - DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage - .newBuilder() - .setSubSlingId(subSlingId) - .setSubAgentName(subAgentName) - .setSubscriberConfiguration(subscriberConfiguration); - for (String pubAgentName : pubAgentNames) { - disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset)); - } - return disMsgBuilder.build(); } private SubscriberState subscriberState(String pubAgentName, long offset) { int retries = bookKeeper.getRetries(pubAgentName); - return Messages.SubscriberState.newBuilder() - .setPubAgentName(pubAgentName) - .setRetries(retries) - .setOffset(offset) + return SubscriberState.builder() + .pubAgentName(pubAgentName) + .retries(retries) + .offset(offset) .build(); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java index 2984399..59d9687 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java @@ -22,9 +22,6 @@ import static java.lang.String.format; import static java.lang.System.currentTimeMillis; import static java.util.Collections.singletonMap; import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; -import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED; -import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED; -import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED; import java.io.Closeable; import java.io.IOException; @@ -45,9 +42,9 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; @@ -151,7 +148,7 @@ public class BookKeeper implements Closeable { ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { packageHandler.apply(importerResolver, pkgMsg); if (editable) { - storeStatus(importerResolver, new PackageStatus(IMPORTED, offset, pkgMsg.getPubAgentName())); + storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName())); } storeOffset(importerResolver, offset); importerResolver.commit(); @@ -170,7 +167,7 @@ public class BookKeeper implements Closeable { private void addPackageMDC(PackageMessage pkgMsg) { MDC.put("module", "distribution"); MDC.put("package-id", pkgMsg.getPkgId()); - String paths = String.join(",", pkgMsg.getPathsList()); + String paths = String.join(",", pkgMsg.getPaths()); MDC.put("paths", paths); MDC.put("pub-sling-id", pkgMsg.getPubSlingId()); String pubAgentName = pkgMsg.getPubAgentName(); @@ -212,7 +209,7 @@ public class BookKeeper implements Closeable { Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time(); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { if (editable) { - storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName())); + storeStatus(resolver, new PackageStatus(Status.REMOVED, offset, pkgMsg.getPubAgentName())); } storeOffset(resolver, offset); resolver.commit(); @@ -263,12 +260,12 @@ public class BookKeeper implements Closeable { } private void sendStatusMessage(PackageStatus status) { - PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder() - .setSubSlingId(subSlingId) - .setSubAgentName(subAgentName) - .setPubAgentName(status.pubAgentName) - .setOffset(status.offset) - .setStatus(status.status) + PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder() + .subSlingId(subSlingId) + .subAgentName(subAgentName) + .pubAgentName(status.pubAgentName) + .offset(status.offset) + .status(status.status) .build(); sender.accept(pkgStatMsg); log.info("Sent status message {}", pkgStatMsg); @@ -305,7 +302,7 @@ public class BookKeeper implements Closeable { pkgMsg.getPkgId(), pkgMsg.getReqType(), offset); Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time(); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { - storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName())); + storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED, offset, pkgMsg.getPubAgentName())); storeOffset(resolver, offset); resolver.commit(); } catch (Exception e) { @@ -351,7 +348,7 @@ public class BookKeeper implements Closeable { PackageStatus(ValueMap statusMap) { Integer statusNum = statusMap.get("statusNumber", Integer.class); - this.status = statusNum !=null ? Status.valueOf(statusNum) : null; + this.status = statusNum !=null ? Status.fromNumber(statusNum) : null; this.offset = statusMap.get(KEY_OFFSET, Long.class); this.pubAgentName = statusMap.get("pubAgentName", String.class); this.sent = statusMap.get("sent", true); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java index f9d047b..703a9d1 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java @@ -28,7 +28,7 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.CommandMessage; +import org.apache.sling.distribution.journal.messages.ClearCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,8 @@ public class CommandPoller implements Closeable { poller = messagingProvider.createPoller( topics.getCommandTopic(), Reset.earliest, - create(CommandMessage.class, this::handleCommandMessage)); + create(ClearCommand.class, this::handleCommandMessage) + ); } else { poller = null; } @@ -67,17 +68,13 @@ public class CommandPoller implements Closeable { return offset <= clearOffset.longValue(); } - private void handleCommandMessage(MessageInfo info, CommandMessage message) { + private void handleCommandMessage(MessageInfo info, ClearCommand message) { if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) { LOG.debug("Skip command for subSlingId {}", message.getSubSlingId()); return; } - if (message.hasClearCommand()) { - handleClearCommand(message.getClearCommand().getOffset()); - } else { - LOG.warn("Unsupported command {}", message); - } + handleClearCommand(message.getOffset()); } private void handleClearCommand(long offset) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index e30ff59..e642dfe 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -22,7 +22,6 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG; import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET; @@ -64,9 +63,9 @@ import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.JournalAvailable; import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.precondition.Precondition; @@ -76,7 +75,8 @@ import org.apache.sling.distribution.journal.impl.shared.AgentState; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.log.spi.DistributionLog; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -93,8 +93,6 @@ import org.osgi.service.metatype.annotations.Designate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.GeneratedMessage; - /** * A Subscriber SCD agent which consumes messages produced by a * {@code DistributionPublisher} agent. @@ -200,14 +198,15 @@ public class DistributionSubscriber implements DistributionAgent { ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling()); PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor); + Consumer<PackageStatusMessage> sender = messagingProvider.createSender(topics.getStatusTopic()); bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, - sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries); + sender, subAgentName, subSlingId, editable, maxRetries); long startOffset = bookKeeper.loadOffset() + 1; String assign = messagingProvider.assignTo(startOffset); packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign, - create(PackageMessage.class, this::handlePackageMessage)); + HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage)); commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable); @@ -215,7 +214,7 @@ public class DistributionSubscriber implements DistributionAgent { format("Queue Processor for Subscriber agent %s", subAgentName)); int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000); - announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper, + announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper, maxRetries, config.editable(), announceDelay); pkgType = requireNonNull(packageBuilder.getType()); @@ -228,11 +227,6 @@ public class DistributionSubscriber implements DistributionAgent { componentReg = context.registerService(DistributionAgent.class, this, props); } - private <T extends GeneratedMessage> Consumer<T> sender(String topic) { - MessageSender<T> sender = messagingProvider.createSender(); - return msg -> sender.send(topic, msg); - } - private Set<String> getNotEmpty(String[] agentNames) { return Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet()); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java index bdfb3dd..bccf474 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java @@ -28,7 +28,7 @@ import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.impl.shared.PackageBrowser; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,12 +63,12 @@ public class PackageHandler { private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException { - LOG.info("Importing paths {}",pkgMsg.getPathsList()); + LOG.info("Importing paths {}",pkgMsg.getPaths()); InputStream pkgStream = null; try { pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg); packageBuilder.installPackage(resolver, pkgStream); - extractor.handle(resolver, pkgMsg.getPathsList()); + extractor.handle(resolver, pkgMsg.getPaths()); } finally { IOUtils.closeQuietly(pkgStream); } @@ -77,8 +77,8 @@ public class PackageHandler { private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg) throws PersistenceException { - LOG.info("Deleting paths {}",pkgMsg.getPathsList()); - for (String path : pkgMsg.getPathsList()) { + LOG.info("Deleting paths {}",pkgMsg.getPaths()); + for (String path : pkgMsg.getPaths()) { Resource resource = resolver.getResource(path); if (resource != null) { resolver.delete(resource); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java index 1c5350e..f882f32 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java @@ -18,11 +18,10 @@ */ package org.apache.sling.distribution.journal.impl.precondition; -import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessagingProvider; @@ -96,12 +95,12 @@ public class PackageStatusWatcherTest { } PackageStatusMessage createStatusMessage(int i) { - return PackageStatusMessage.newBuilder() - .setSubSlingId(SUB1_SLING_ID) - .setSubAgentName(SUB1_AGENT_NAME) - .setPubAgentName(PUB1_AGENT_NAME) - .setOffset(1000 + i) - .setStatus(PackageStatusMessage.Status.REMOVED_FAILED) + return PackageStatusMessage.builder() + .subSlingId(SUB1_SLING_ID) + .subAgentName(SUB1_AGENT_NAME) + .pubAgentName(PUB1_AGENT_NAME) + .offset(1000 + i) + .status(PackageStatusMessage.Status.REMOVED_FAILED) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java index 59b1c14..3a31224 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java @@ -37,7 +37,7 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Before; @@ -143,12 +143,12 @@ public class StagingPreconditionTest { } private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) { - PackageStatusMessage message = PackageStatusMessage.newBuilder() - .setSubSlingId(SUB1_SLING_ID) - .setSubAgentName(subAgentName) - .setPubAgentName(PUB1_AGENT_NAME) - .setOffset(pkgOffset) - .setStatus(status) + PackageStatusMessage message = PackageStatusMessage.builder() + .subSlingId(SUB1_SLING_ID) + .subAgentName(subAgentName) + .pubAgentName(PUB1_AGENT_NAME) + .offset(pkgOffset) + .status(status) .build(); TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java index d39f280..0adf713 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java @@ -36,9 +36,9 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -123,20 +123,20 @@ public class DiscoveryServiceTest { } private DiscoveryMessage discoveryMessage(String subSlingId, String subAgentName, SubscriberState... subStates) { - return DiscoveryMessage.newBuilder() - .setSubSlingId(subSlingId) - .setSubAgentName(subAgentName) - .setSubscriberConfiguration(SubscriberConfiguration - .newBuilder() - .setEditable(false) - .setMaxRetries(-1) + return DiscoveryMessage.builder() + .subSlingId(subSlingId) + .subAgentName(subAgentName) + .subscriberConfiguration(SubscriberConfig + .builder() + .editable(false) + .maxRetries(-1) .build()) - .addAllSubscriberState(Arrays.asList(subStates)).build(); + .subscriberStates(Arrays.asList(subStates)).build(); } private SubscriberState subscriberState(String pubAgentName, int offset) { - return SubscriberState.newBuilder() - .setPubAgentName(pubAgentName) - .setOffset(offset).build(); + return SubscriberState.builder() + .pubAgentName(pubAgentName) + .offset(offset).build(); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java index 125aa5f..929de44 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java @@ -18,6 +18,7 @@ */ package org.apache.sling.distribution.journal.impl.publisher; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -30,12 +31,14 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.sling.distribution.journal.messages.Messages; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.SimpleDistributionRequest; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.packaging.DistributionPackageInfo; import org.junit.Before; @@ -66,7 +69,7 @@ public class DistributionPackageFactoryTest { public void testAdd() throws DistributionException, IOException { DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/test"); - org.apache.sling.distribution.packaging.DistributionPackage pkg = mock(org.apache.sling.distribution.packaging.DistributionPackage.class); + DistributionPackage pkg = mock(DistributionPackage.class); when(pkg.createInputStream()).thenReturn(new ByteArrayInputStream(new byte[] {})); when(pkg.getId()).thenReturn("myid"); Map<String, Object> props = new HashMap<>(); @@ -77,25 +80,25 @@ public class DistributionPackageFactoryTest { when(pkg.getInfo()).thenReturn(info); when(packageBuilder.createPackage(Mockito.eq(resourceResolver), Mockito.eq(request))).thenReturn(pkg); - Messages.PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request); + PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request); assertThat(sent.getPkgBinary(), notNullValue()); assertThat(sent.getPkgLength(), equalTo(0L)); - assertThat(sent.getReqType(), equalTo(Messages.PackageMessage.ReqType.ADD)); + assertThat(sent.getReqType(), equalTo(ReqType.ADD)); assertThat(sent.getPkgType(), equalTo("journal")); - assertThat(sent.getPathsList(), contains("/test")); - assertThat(sent.getDeepPathsList(), contains("/test2")); + assertThat(sent.getPaths(), contains("/test")); + assertThat(sent.getDeepPaths(), contains("/test2")); } @Test public void testDelete() throws DistributionException, IOException { DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.DELETE, "/test"); - Messages.PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request); - assertThat(sent.getReqType(), equalTo(Messages.PackageMessage.ReqType.DELETE)); - assertThat(sent.getPkgBinary(), notNullValue()); + PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request); + assertThat(sent.getReqType(), equalTo(ReqType.DELETE)); + assertThat(sent.getPkgBinary(), nullValue()); assertThat(sent.getPkgLength(), equalTo(0L)); - assertThat(sent.getPathsList(), contains("/test")); + assertThat(sent.getPaths(), contains("/test")); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 7ab2bdf..32a8a11 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.ByteString; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.commons.metrics.Histogram; import org.apache.sling.commons.metrics.Meter; @@ -131,7 +130,7 @@ public class DistributionPublisherTest { private ResourceResolver resourceResolver; @Mock - private MessageSender<Messages.PackageMessage> sender; + private MessageSender<PackageMessage> sender; @Mock private PackageQueuedNotifier queuedNotifier; @@ -140,7 +139,7 @@ public class DistributionPublisherTest { private TopologyView topology; @Captor - private ArgumentCaptor<Messages.PackageMessage> pkgCaptor; + private ArgumentCaptor<PackageMessage> pkgCaptor; @Spy private Topics topics = new Topics(); @@ -155,7 +154,7 @@ public class DistributionPublisherTest { when(slingSettings.getSlingId()).thenReturn("pub1sling"); when(context.registerService(Mockito.eq(DistributionAgent.class), Mockito.eq(publisher), Mockito.any(Dictionary.class))).thenReturn(serviceReg); - when(messagingProvider.<Messages.PackageMessage>createSender()).thenReturn(sender); + when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender); publisher.activate(config, context); when(timer.time()).thenReturn(timerContext); } @@ -171,7 +170,7 @@ public class DistributionPublisherTest { public void testSend() throws DistributionException, IOException { DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/test"); - Messages.PackageMessage pkg = mockPackage(request); + PackageMessage pkg = mockPackage(request); when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg); CompletableFuture<Void> callback = CompletableFuture.completedFuture(null); when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback); @@ -184,8 +183,8 @@ public class DistributionPublisherTest { DistributionResponse response = publisher.execute(resourceResolver, request); assertThat(response.getState(), equalTo(DistributionRequestState.ACCEPTED)); - verify(sender).send(Mockito.eq(topics.getPackageTopic()), pkgCaptor.capture()); - Messages.PackageMessage sent = pkgCaptor.getValue(); + verify(sender).accept(pkgCaptor.capture()); + PackageMessage sent = pkgCaptor.getValue(); // Individual fields are checks in factory assertThat(sent, notNullValue()); @@ -252,15 +251,15 @@ public class DistributionPublisherTest { return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false); } - private Messages.PackageMessage mockPackage(DistributionRequest request) throws IOException { - return Messages.PackageMessage.newBuilder() - .setPkgId("myid") - .setPubSlingId("pub1sling") - .setPkgType("journal") - .setReqType(Messages.PackageMessage.ReqType.ADD) - .addAllPaths(Arrays.asList(request.getPaths())) - .addDeepPaths("/test2") - .setPkgBinary(ByteString.copyFrom(new byte[100])) + private PackageMessage mockPackage(DistributionRequest request) throws IOException { + return PackageMessage.builder() + .pkgId("myid") + .pubSlingId("pub1sling") + .pkgType("journal") + .reqType(PackageMessage.ReqType.ADD) + .paths(Arrays.asList(request.getPaths())) + .deepPaths(Arrays.asList("/test2")) + .pkgBinary(new byte[100]) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java index 2444d21..6bf412f 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java @@ -18,6 +18,7 @@ */ package org.apache.sling.distribution.journal.impl.publisher; +import java.util.Arrays; import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -28,8 +29,9 @@ import org.junit.Assert; import org.junit.Test; import org.osgi.service.event.Event; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; + import static org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED; public class PackageQueuedNotifierTest { @@ -66,12 +68,12 @@ public class PackageQueuedNotifierTest { } private PackageMessage pkgMsg(String packageId) { - return PackageMessage.newBuilder() - .addPaths("/test") - .setPkgId(packageId) - .setReqType(ReqType.ADD) - .setPkgType("journal") - .setPubSlingId("sling1") + return PackageMessage.builder() + .paths(Arrays.asList("/test")) + .pkgId(packageId) + .reqType(ReqType.ADD) + .pkgType("journal") + .pubSlingId("sling1") .build(); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java index 43ed4f7..8ec5aae 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java @@ -39,7 +39,7 @@ import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.testing.mock.osgi.MockOsgi; import org.apache.sling.testing.mock.sling.MockSling; @@ -79,7 +79,7 @@ public class PackageRepoTest { private DistributionMetricsService distributionMetricsService; @Captor - private ArgumentCaptor<Messages.PackageMessage> pkgCaptor; + private ArgumentCaptor<PackageMessage> pkgCaptor; @Spy private Topics topics = new Topics(); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java index 1bf6fd8..8097105 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java @@ -30,14 +30,16 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasItemInArray; import static org.junit.Assert.assertThat; +import java.util.Arrays; + import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.hamcrest.Matcher; import org.junit.Test; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.MessageInfo; public class QueueItemFactoryTest { @@ -47,13 +49,13 @@ public class QueueItemFactoryTest { @Test public void test() { MessageInfo info = new TestMessageInfo("topic", 0, 1, 2); - PackageMessage message = PackageMessage.newBuilder() - .setPubSlingId("sling1") - .setPkgId("pkg1") - .setPkgType("type") - .addPaths("/") - .addDeepPaths("/deep") - .setReqType(ReqType.ADD) + PackageMessage message = PackageMessage.builder() + .pubSlingId("sling1") + .pkgId("pkg1") + .pkgType("type") + .paths(Arrays.asList("/")) + .deepPaths(Arrays.asList("/deep")) + .reqType(ReqType.ADD) .build(); item = QueueItemFactory.fromPackage(info, message, true); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index 3f0dc5e..e7f2444 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -20,9 +20,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl; import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.ADD; -import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE; -import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; @@ -52,8 +49,8 @@ import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.subscriber.LocalStore; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.awaitility.Awaitility; @@ -157,7 +154,7 @@ public class PubQueueCacheTest { @Test public void testSeedingFromNewPackageMessage() throws Exception { - simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0); + simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 0); OffsetQueue<DistributionQueueItem> queue = cache.getOffsetQueue(PUB_AGENT_NAME_1, 0); assertThat(queue.getSize(), greaterThan(0)); } @@ -204,12 +201,12 @@ public class PubQueueCacheTest { @Test public void testCacheSize() throws Exception { - simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0); - simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1); - simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2); - simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3); // TEST message does not increase the cache size - simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4); // TEST message does not increase the cache size - simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5); + simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 0); + simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.DELETE, 1); + simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 2); + simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.TEST, 3); // TEST message does not increase the cache size + simulateMessage(tailHandler, PUB_AGENT_NAME_2, ReqType.TEST, 4); // TEST message does not increase the cache size + simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 5); assertEquals(4, cache.size()); } @@ -220,16 +217,16 @@ public class PubQueueCacheTest { private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) { simulateMessage(handler, pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3), - pickAny(ADD, DELETE, TEST), offset); + pickAny(ReqType.ADD, ReqType.DELETE, ReqType.TEST), offset); } private void simulateMessage(MessageHandler<PackageMessage> handler, String pubAgentName, ReqType reqType, long offset) { - PackageMessage msg = PackageMessage.newBuilder() - .setPkgType("pkgType") - .setPkgId(UUID.randomUUID().toString()) - .setPubSlingId("pubSlingId") - .setReqType(reqType) - .setPubAgentName(pubAgentName) + PackageMessage msg = PackageMessage.builder() + .pkgType("pkgType") + .pkgId(UUID.randomUUID().toString()) + .pubSlingId("pubSlingId") + .reqType(reqType) + .pubAgentName(pubAgentName) .build(); simulateMessage(handler, msg, offset); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index 009235c..2435c3a 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.Arrays; import java.util.Iterator; import java.util.Set; import java.util.UUID; @@ -41,9 +42,11 @@ import javax.management.ReflectionException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.impl.shared.Topics; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.apache.sling.distribution.journal.MessageSender; -import com.google.protobuf.GeneratedMessage; - import org.apache.sling.distribution.journal.impl.subscriber.LocalStore; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -59,16 +62,11 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; import org.osgi.service.event.EventAdmin; @@ -88,7 +86,7 @@ public class PubQueueProviderTest { private SlingSettingsService slingSettings; @Captor - private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> handlerCaptor; + private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; @Captor private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor; @@ -103,13 +101,13 @@ public class PubQueueProviderTest { private EventAdmin eventAdmin; @Mock - private MessageSender<GeneratedMessage> sender; + private MessageSender<Object> sender; private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); private PubQueueCacheService pubQueueCacheService; - private MessageHandler<Messages.PackageMessage> handler; + private MessageHandler<PackageMessage> handler; private MessageHandler<PackageStatusMessage> statHandler; private PubQueueProviderImpl queueProvider; @@ -129,7 +127,7 @@ public class PubQueueProviderTest { Mockito.any(Reset.class), statHandlerCaptor.capture())) .thenReturn(statPoller); - when(clientProvider.createSender()) + when(clientProvider.createSender(Mockito.anyString())) .thenReturn(sender); Topics topics = new Topics(); String slingId = UUID.randomUUID().toString(); @@ -209,12 +207,12 @@ public class PubQueueProviderTest { } private PackageStatusMessage statusMessage(long offset, PackageMessage pkgMsg1) { - return PackageStatusMessage.newBuilder() - .setOffset(offset) - .setPubAgentName(PUB1_AGENT_NAME) - .setStatus(Status.REMOVED_FAILED) - .setSubAgentName(SUB_AGENT_NAME) - .setSubSlingId(SUB_SLING_ID) + return PackageStatusMessage.builder() + .offset(offset) + .pubAgentName(PUB1_AGENT_NAME) + .status(Status.REMOVED_FAILED) + .subAgentName(SUB_AGENT_NAME) + .subSlingId(SUB_SLING_ID) .build(); } @@ -224,13 +222,13 @@ public class PubQueueProviderTest { } private PackageMessage packageMessage(String packageId, String pubAgentName) { - return Messages.PackageMessage.newBuilder() - .setPubAgentName(pubAgentName) - .setPubSlingId("pub1SlingId") - .setPkgId(packageId) - .setReqType(ReqType.ADD) - .setPkgType("journal") - .addPaths("path") + return PackageMessage.builder() + .pubAgentName(pubAgentName) + .pubSlingId("pub1SlingId") + .pkgId(packageId) + .reqType(ReqType.ADD) + .pkgType("journal") + .paths(Arrays.asList("path")) .build(); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java index eeebb7a..69b42ca 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java @@ -27,7 +27,8 @@ import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -40,7 +41,6 @@ import org.mockito.runners.MockitoJUnitRunner; import static java.lang.System.currentTimeMillis; import static org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC; -import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; @@ -81,8 +81,8 @@ public class QueueCacheSeederTest { any(Reset.class), pkgHandlerCaptor.capture())) .thenReturn(poller); - doNothing().when(sender).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture()); - when(clientProvider.<PackageMessage>createSender()) + doNothing().when(sender).send(pkgMsgCaptor.capture()); + when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC))) .thenReturn(sender); seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC); } @@ -99,10 +99,10 @@ public class QueueCacheSeederTest { @Test public void testSendingSeeds() { seeder.seed(callback); - verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture()); + verify(sender, timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture()); PackageMessage seedMsg = pkgMsgCaptor.getValue(); assertNotNull(seedMsg); - assertEquals(TEST, seedMsg.getReqType()); + assertEquals(ReqType.TEST, seedMsg.getReqType()); } @After diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java index 9a597f8..a1d66fb 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.sling.distribution.journal.FullMessage; @@ -36,9 +37,8 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,7 +58,7 @@ public class RangePollerTest { private MessagingProvider clientProvider; @Captor - private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> handlerCaptor; + private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; @Mock private Closeable poller; @@ -107,15 +107,15 @@ public class RangePollerTest { } } - private FullMessage<Messages.PackageMessage> createMessage(ReqType reqType, int offset) { + private FullMessage<PackageMessage> createMessage(ReqType reqType, int offset) { MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis()); - PackageMessage message = Messages.PackageMessage.newBuilder() - .setPubAgentName("agent1") - .setPubSlingId("pub1SlingId") - .setPkgId("package-" + offset) - .setReqType(reqType) - .setPkgType("journal") - .addPaths("path") + PackageMessage message = PackageMessage.builder() + .pubAgentName("agent1") + .pubSlingId("pub1SlingId") + .pkgId("package-" + offset) + .reqType(reqType) + .pkgType("journal") + .paths(Arrays.asList("path")) .build(); return new FullMessage<>(info, message); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java index 5546b50..8f7c262 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java @@ -27,9 +27,11 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.List; -import org.apache.sling.distribution.journal.impl.shared.LimitPoller; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -39,9 +41,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; @@ -60,7 +59,7 @@ public class LimitPollerTest { private MessagingProvider clientProvider; @Captor - private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> handlerCaptor; + private ArgumentCaptor<HandlerAdapter<org.apache.sling.distribution.journal.messages.PackageMessage>> handlerCaptor; @Mock private Closeable poller; @@ -112,15 +111,15 @@ public class LimitPollerTest { return message; } - private FullMessage<Messages.PackageMessage> createMessage(int offset) { + private FullMessage<PackageMessage> createMessage(int offset) { MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis()); - PackageMessage message = Messages.PackageMessage.newBuilder() - .setPubAgentName("agent1") - .setPubSlingId("pub1SlingId") - .setPkgId("package-" + offset) - .setReqType(ReqType.ADD) - .setPkgType("journal") - .addPaths("path") + PackageMessage message = PackageMessage.builder() + .pubAgentName("agent1") + .pubSlingId("pub1SlingId") + .pkgId("package-" + offset) + .reqType(ReqType.ADD) + .pkgType("journal") + .paths(Arrays.asList("path")) .build(); return new FullMessage<>(info, message); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java index 659f69e..c19f633 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import javax.jcr.Binary; import javax.jcr.Session; @@ -34,8 +35,8 @@ import javax.jcr.ValueFactory; import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -85,10 +86,12 @@ public class PackageBrowserTest { } private PackageMessage createPackageMsg(long offset) throws Exception { - return PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD) - .addPaths("/content") - .setPkgId("pkgid") - .setPkgType("some_type") - .setPkgBinaryRef("myref").build(); + return PackageMessage.builder() + .pubSlingId("") + .reqType(ReqType.ADD) + .paths(Arrays.asList("/content")) + .pkgId("pkgid") + .pkgType("some_type") + .pkgBinaryRef("myref").build(); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java index 1a46360..36009ba 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -41,8 +42,8 @@ import javax.servlet.http.HttpServletResponse; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,8 +53,6 @@ import org.mockito.Mockito; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; -import com.google.protobuf.ByteString; - @RunWith(MockitoJUnitRunner.class) public class PackageViewerPluginTest { @@ -147,13 +146,13 @@ public class PackageViewerPluginTest { private FullMessage<PackageMessage> createPackageMsg(long offset) { MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0L); - PackageMessage message = PackageMessage.newBuilder() - .setPubSlingId("") - .setReqType(ReqType.ADD) - .addPaths("/content") - .setPkgId("pkgid") - .setPkgType("some_type") - .setPkgBinary(ByteString.copyFrom("package content", Charset.defaultCharset())) + PackageMessage message = PackageMessage.builder() + .pubSlingId("") + .reqType(ReqType.ADD) + .paths(Arrays.asList("/content")) + .pkgId("pkgid") + .pkgType("some_type") + .pkgBinary("package content".getBytes(Charset.defaultCharset())) .build(); return new FullMessage<>(info, message); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java index a2105e3..a5112f9 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java @@ -18,6 +18,8 @@ */ package org.apache.sling.distribution.journal.impl.shared; +import java.util.Map; + import org.apache.sling.distribution.journal.MessageInfo; public class TestMessageInfo implements MessageInfo { @@ -50,4 +52,9 @@ public class TestMessageInfo implements MessageInfo { return createTime; } + @Override + public Map<String, String> getProps() { + return null; + } + } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java index 5746dc7..e380f5e 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java @@ -27,7 +27,8 @@ import static org.mockito.Mockito.when; import java.util.Collections; import java.util.function.Consumer; -import org.apache.sling.distribution.journal.messages.Messages; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -41,15 +42,15 @@ public class AnnouncerTest { @Test @SuppressWarnings("unchecked") public void testDiscoveryMessage() throws InterruptedException { - Consumer<Messages.DiscoveryMessage> sender = Mockito.mock(Consumer.class); + Consumer<DiscoveryMessage> sender = Mockito.mock(Consumer.class); BookKeeper bookKeeper = Mockito.mock(BookKeeper.class); when(bookKeeper.loadOffset()).thenReturn(1L); Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000); Thread.sleep(200); - ArgumentCaptor<Messages.DiscoveryMessage> msg = forClass(Messages.DiscoveryMessage.class); + ArgumentCaptor<DiscoveryMessage> msg = forClass(DiscoveryMessage.class); verify(sender).accept(msg.capture()); - Messages.DiscoveryMessage message = msg.getValue(); - Messages.SubscriberState offset = message.getSubscriberStateList().iterator().next(); + DiscoveryMessage message = msg.getValue(); + SubscriberState offset = message.getSubscriberStates().iterator().next(); assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID)); assertThat(offset.getPubAgentName(), equalTo(PUB1_AGENT_NAME)); assertThat(offset.getOffset(), equalTo(1L)); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java index 1345895..3afa938 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java @@ -27,7 +27,7 @@ import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Before; import org.junit.Test; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java index 72d0257..9cc3503 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java @@ -35,8 +35,7 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; -import org.apache.sling.distribution.journal.messages.Messages.ClearCommand; -import org.apache.sling.distribution.journal.messages.Messages.CommandMessage; +import org.apache.sling.distribution.journal.messages.ClearCommand; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Before; @@ -66,9 +65,9 @@ public class CommandPollerTest { CommandPoller commandPoller; @Captor - private ArgumentCaptor<HandlerAdapter<CommandMessage>> handlerCaptor; + private ArgumentCaptor<HandlerAdapter<ClearCommand>> handlerCaptor; - private MessageHandler<CommandMessage> commandHandler; + private MessageHandler<ClearCommand> commandHandler; private Topics topics = new Topics(); @@ -120,15 +119,6 @@ public class CommandPollerTest { } @Test - public void testIgnoreInvalidCommand() throws DistributionException, InterruptedException, IOException { - createCommandPoller(true); - - CommandMessage message = CommandMessage.newBuilder(commandMessage(10L)).clearClearCommand().build(); - commandHandler.handle(info, message); - assertClearedUpTo(-1); - } - - @Test public void testEditable() throws DistributionException, InterruptedException, IOException { createCommandPoller(true); @@ -150,18 +140,15 @@ public class CommandPollerTest { assertThat(commandPoller.isCleared(1), equalTo(false)); } - private CommandMessage commandMessage(long offset) { + private ClearCommand commandMessage(long offset) { return commandMessage(SUB_SLING_ID, SUB_AGENT_NAME, offset); } - private CommandMessage commandMessage(String subSlingId, String subAgentName, long offset) { - ClearCommand command = ClearCommand.newBuilder() - .setOffset(offset) - .build(); - return CommandMessage.newBuilder() - .setClearCommand(command) - .setSubAgentName(subAgentName) - .setSubSlingId(subSlingId) + private ClearCommand commandMessage(String subSlingId, String subAgentName, long offset) { + return ClearCommand.builder() + .subAgentName(subAgentName) + .subSlingId(subSlingId) + .offset(offset) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 6ee55f8..a54a26b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -52,6 +52,10 @@ import org.apache.sling.distribution.journal.impl.precondition.Precondition; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; @@ -97,18 +101,12 @@ import org.osgi.framework.ServiceRegistration; import org.osgi.service.event.EventAdmin; import org.osgi.util.converter.Converters; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; - import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.ByteString; @SuppressWarnings("unchecked") public class SubscriberTest { @@ -119,23 +117,23 @@ public class SubscriberTest { private static final String PUB1_SLING_ID = "pub1sling"; private static final String PUB1_AGENT_NAME = "pub1agent"; - private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder() - .setPkgId("myid") - .setPubSlingId(PUB1_SLING_ID) - .setPubAgentName(PUB1_AGENT_NAME) - .setReqType(ReqType.ADD) - .setPkgType("journal") - .addAllPaths(Arrays.asList("/test")) - .setPkgBinary(ByteString.copyFrom(new byte[100])) + private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.builder() + .pkgId("myid") + .pubSlingId(PUB1_SLING_ID) + .pubAgentName(PUB1_AGENT_NAME) + .reqType(ReqType.ADD) + .pkgType("journal") + .paths(Arrays.asList("/test")) + .pkgBinary(new byte[100]) .build(); - private static final PackageMessage BASIC_DEL_PACKAGE = PackageMessage.newBuilder() - .setPkgId("myid") - .setPubSlingId(PUB1_SLING_ID) - .setPubAgentName(PUB1_AGENT_NAME) - .setReqType(ReqType.DELETE) - .setPkgType("journal") - .addAllPaths(Arrays.asList("/test")) + private static final PackageMessage BASIC_DEL_PACKAGE = PackageMessage.builder() + .pkgId("myid") + .pubSlingId(PUB1_SLING_ID) + .pubAgentName(PUB1_AGENT_NAME) + .reqType(ReqType.DELETE) + .pkgType("journal") + .paths(Arrays.asList("/test")) .build(); @@ -193,7 +191,6 @@ public class SubscriberTest { private MessageHandler<PackageMessage> packageHandler; - @SuppressWarnings("rawtypes") @Before public void before() { DistributionSubscriber.QUEUE_FETCH_DELAY = 100; @@ -207,7 +204,9 @@ public class SubscriberTest { mockMetrics(); - when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender, (MessageSender) discoverySender); + when(clientProvider.<PackageStatusMessage>createSender(Mockito.eq(topics.getStatusTopic()))).thenReturn(statusSender); + when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender); + when(clientProvider.createPoller( Mockito.anyString(), Mockito.eq(Reset.earliest), @@ -251,8 +250,7 @@ public class SubscriberTest { sem.release(); waitSubscriber(IDLE); - verify(statusSender, times(0)).send(eq(topics.getStatusTopic()), - anyObject()); + verify(statusSender, times(0)).accept(anyObject()); List<String> log = subscriber.getLog().getLines(); // We do not use the DistributionLog anymore assertThat(log.size(), equalTo(0)); @@ -302,8 +300,7 @@ public class SubscriberTest { ).thenThrow(new RuntimeException("Expected")); packageHandler.handle(info, message); - verify(statusSender, timeout(10000).times(1)).send(eq(topics.getStatusTopic()), - anyObject()); + verify(statusSender, timeout(10000).times(1)).accept(anyObject()); } @Test @@ -317,8 +314,7 @@ public class SubscriberTest { packageHandler.handle(info, message); waitSubscriber(IDLE); - verify(statusSender, timeout(10000).times(1)).send(eq(topics.getStatusTopic()), - anyObject()); + verify(statusSender, timeout(10000).times(1)).accept(anyObject()); } @Test
