This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new bf91078 SLING-9504 - Migrate to json (#41)
bf91078 is described below
commit bf910786e225d08d28cb45d5bd66735a3855ecb3
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jun 18 17:28:21 2020 +0200
SLING-9504 - Migrate to json (#41)
---
pom.xml | 4 +-
.../journal/impl/event/DistributionEvent.java | 12 ++---
.../impl/precondition/PackageStatusWatcher.java | 9 ++--
.../impl/precondition/StagingPrecondition.java | 2 +-
.../journal/impl/publisher/DiscoveryService.java | 17 +++---
.../impl/publisher/DistributionPublisher.java | 16 ++----
.../impl/publisher/PackageDistributedNotifier.java | 8 +--
.../impl/publisher/PackageMessageFactory.java | 61 +++++++++++-----------
.../journal/impl/queue/QueueItemFactory.java | 10 ++--
.../journal/impl/queue/impl/PubQueueCache.java | 7 +--
.../impl/queue/impl/PubQueueProviderImpl.java | 40 +++++++-------
.../journal/impl/queue/impl/QueueCacheSeeder.java | 18 +++----
.../journal/impl/queue/impl/RangePoller.java | 8 +--
.../journal/impl/shared/LimitPoller.java | 8 +--
.../journal/impl/shared/PackageBrowser.java | 6 +--
.../journal/impl/shared/PackageViewerPlugin.java | 6 +--
.../journal/impl/subscriber/Announcer.java | 42 ++++++++-------
.../journal/impl/subscriber/BookKeeper.java | 31 +++++------
.../journal/impl/subscriber/CommandPoller.java | 13 ++---
.../impl/subscriber/DistributionSubscriber.java | 20 +++----
.../journal/impl/subscriber/PackageHandler.java | 10 ++--
.../precondition/PackageStatusWatcherTest.java | 17 +++---
.../impl/precondition/StagingPreconditionTest.java | 14 ++---
.../impl/publisher/DiscoveryServiceTest.java | 28 +++++-----
.../publisher/DistributionPackageFactoryTest.java | 23 ++++----
.../impl/publisher/DistributionPublisherTest.java | 33 ++++++------
.../impl/publisher/PackageQueuedNotifierTest.java | 18 ++++---
.../journal/impl/publisher/PackageRepoTest.java | 4 +-
.../journal/impl/queue/QueueItemFactoryTest.java | 20 +++----
.../journal/impl/queue/impl/PubQueueCacheTest.java | 35 ++++++-------
.../impl/queue/impl/PubQueueProviderTest.java | 46 ++++++++--------
.../impl/queue/impl/QueueCacheSeederTest.java | 12 ++---
.../journal/impl/queue/impl/RangePollerTest.java | 24 ++++-----
.../journal/impl/shared/LimitPollerTest.java | 25 +++++----
.../journal/impl/shared/PackageBrowserTest.java | 17 +++---
.../impl/shared/PackageViewerPluginTest.java | 21 ++++----
.../journal/impl/shared/TestMessageInfo.java | 7 +++
.../journal/impl/subscriber/AnnouncerTest.java | 11 ++--
.../journal/impl/subscriber/BookKeeperTest.java | 2 +-
.../journal/impl/subscriber/CommandPollerTest.java | 31 ++++-------
.../journal/impl/subscriber/SubscriberTest.java | 54 +++++++++----------
41 files changed, 380 insertions(+), 410 deletions(-)
diff --git a/pom.xml b/pom.xml
index 2bb23f6..78cc146 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<!-- P R O J E C T
-->
<!--
======================================================================= -->
<artifactId>org.apache.sling.distribution.journal</artifactId>
- <version>0.1.17-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<name>Apache Sling Journal based Content Distribution - Core bundle</name>
<description>Implementation of Apache Sling Content Distribution
components on top of an append-only persisted log</description>
@@ -140,7 +140,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.1.2</version>
+ <version>0.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
index c153ad3..a22e724 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
@@ -35,12 +35,10 @@ import java.util.Map;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-
@ParametersAreNonnullByDefault
public class DistributionEvent {
@@ -51,11 +49,11 @@ public class DistributionEvent {
private DistributionEvent() {
}
- public static Event eventImporterImported(Messages.PackageMessage pkgMsg,
String agentName) {
+ public static Event eventImporterImported(PackageMessage pkgMsg, String
agentName) {
return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName,
pkgMsg);
}
- public static Event eventPackageCreated(Messages.PackageMessage pkgMsg,
String agentName) {
+ public static Event eventPackageCreated(PackageMessage pkgMsg, String
agentName) {
return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName,
pkgMsg);
}
@@ -66,12 +64,12 @@ public class DistributionEvent {
queueItem.getPackageId());
}
- public static Event eventPackageQueued(Messages.PackageMessage pkgMsg,
String agentName) {
+ public static Event eventPackageQueued(PackageMessage pkgMsg, String
agentName) {
return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg);
}
private static Event buildEvent(String topic, String kind, String
agentName, PackageMessage pkgMsg) {
- List<String> pathsList = pkgMsg.getPathsList();
+ List<String> pathsList = pkgMsg.getPaths();
return buildEvent(topic, kind, agentName,
pkgMsg.getReqType().name(),
pathsList.toArray(new String[0]),
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..d359f44 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -30,9 +30,8 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
public class PackageStatusWatcher implements Closeable {
private final Closeable poller;
@@ -46,7 +45,7 @@ public class PackageStatusWatcher implements Closeable {
poller = messagingProvider.createPoller(
topicName,
Reset.earliest,
- create(Messages.PackageStatusMessage.class, this::handle)
+ create(PackageStatusMessage.class, this::handle)
);
}
@@ -73,7 +72,7 @@ public class PackageStatusWatcher implements Closeable {
poller.close();
}
- private void handle(MessageInfo info, Messages.PackageStatusMessage
pkgStatusMsg) {
+ private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
// TODO: check revision
Map<Long, Status> agentStatus =
getAgentStatus(pkgStatusMsg.getSubAgentName());
agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index d523cdf..9b55848 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 496ff81..d2c4972 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -19,9 +19,9 @@
package org.apache.sling.distribution.journal.impl.publisher;
import static java.lang.String.format;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.util.Dictionary;
@@ -31,8 +31,9 @@ import javax.annotation.ParametersAreNonnullByDefault;
import
org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.apache.commons.io.IOUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -41,7 +42,6 @@ import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -105,7 +105,8 @@ public class DiscoveryService implements Runnable {
poller = messagingProvider.createPoller(
topics.getDiscoveryTopic(),
Reset.latest,
- create(Messages.DiscoveryMessage.class, new
DiscoveryMessageHandler()));
+ create(DiscoveryMessage.class, new DiscoveryMessageHandler())
+ );
startTopologyViewUpdaterTask(context);
LOG.info("Discovery service started");
}
@@ -158,9 +159,9 @@ public class DiscoveryService implements Runnable {
long now = System.currentTimeMillis();
AgentId subAgentId = new AgentId(disMsg.getSubSlingId(),
disMsg.getSubAgentName());
- for (Messages.SubscriberState subStateMsg :
disMsg.getSubscriberStateList()) {
- SubscriberConfiguration subConfig =
disMsg.getSubscriberConfiguration();
- State subState = new State(subStateMsg.getPubAgentName(),
subAgentId.getAgentId(), now, subStateMsg.getOffset(),
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable());
+ for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
+ SubscriberConfig subConfig =
disMsg.getSubscriberConfiguration();
+ State subState = new State(subStateMsg.getPubAgentName(),
subAgentId.getAgentId(), now, subStateMsg.getOffset(),
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
viewManager.refreshState(subState);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index d366ab3..6ac1e4b 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -52,6 +52,7 @@ import
org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
@@ -75,8 +76,6 @@ import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
@@ -136,7 +135,7 @@ public class DistributionPublisher implements
DistributionAgent {
private ServiceRegistration<DistributionAgent> componentReg;
- private MessageSender<PackageMessage> sender;
+ private Consumer<PackageMessage> sender;
private JMXRegistration reg;
@@ -146,7 +145,7 @@ public class DistributionPublisher implements
DistributionAgent {
log = new DefaultDistributionLog(pubAgentName, this.getClass(),
DefaultDistributionLog.LogLevel.INFO);
REQ_TYPES.put(ADD, this::sendAndWait);
REQ_TYPES.put(DELETE, this::sendAndWait);
- REQ_TYPES.put(TEST, this::send);
+ REQ_TYPES.put(TEST, this.sender);
}
@Activate
@@ -159,7 +158,7 @@ public class DistributionPublisher implements
DistributionAgent {
pkgType = packageBuilder.getType();
- this.sender = messagingProvider.createSender();
+ this.sender = messagingProvider.createSender(topics.getPackageTopic());
Dictionary<String, Object> props = createServiceProps(config);
componentReg =
requireNonNull(context.registerService(DistributionAgent.class, this, props));
@@ -320,7 +319,7 @@ public class DistributionPublisher implements
DistributionAgent {
CompletableFuture<Void> received =
queuedNotifier.registerWait(pkg.getPkgId());
Event createdEvent = DistributionEvent.eventPackageCreated(pkg,
pubAgentName);
eventAdmin.postEvent(createdEvent);
- send(pkg);
+ sender.accept(pkg);
received.get(queuedTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
queuedNotifier.unRegisterWait(pkg.getPkgId());
@@ -328,11 +327,6 @@ public class DistributionPublisher implements
DistributionAgent {
}
}
- private void send(PackageMessage pipePackage) {
- final String topicName = topics.getPackageTopic();
- sender.send(topicName, pipePackage);
- }
-
@Nonnull
private DistributionResponse executeUnsupported(DistributionRequest
request) {
String msg = String.format("Request type %s is not supported by this
agent, expected one of %s",
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 75326e8..c579089 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.publisher;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.LongStream;
@@ -31,7 +32,6 @@ import
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import
org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.commons.lang3.StringUtils;
@@ -65,7 +65,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
@Reference
private Topics topics;
- private JsonMessageSender<PackageDistributedMessage> sender;
+ private Consumer<PackageDistributedMessage> sender;
private boolean sendMsg;
@@ -73,7 +73,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
public void activate() {
sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
if (sendMsg) {
- sender = messagingProvider.createJsonSender();
+ sender = messagingProvider.createSender(topics.getEventTopic());
}
LOG.info("Started package distributed notifier with event message
topic {}", topics.getEventTopic());
}
@@ -111,7 +111,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS);
msg.deepPaths = (String[])
queueItem.get(PROPERTY_REQUEST_DEEP_PATHS);
- sender.send(topics.getEventTopic(), msg);
+ sender.accept(msg);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 2e7d65e..28bf70a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -33,6 +33,9 @@ import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import
org.apache.sling.distribution.journal.messages.PackageMessage.PackageMessageBuilder;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -44,10 +47,6 @@ import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import com.google.protobuf.ByteString;
-
@Component(service = PackageMessageFactory.class)
@ParametersAreNonnullByDefault
public class PackageMessageFactory {
@@ -106,16 +105,16 @@ public class PackageMessageFactory {
final List<String> paths = Arrays.asList(pkgInfo.getPaths());
final List<String> deepPaths =
Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
final String pkgId = disPkg.getId();
- PackageMessage.Builder pkgBuilder = PackageMessage.newBuilder()
- .setPubSlingId(pubSlingId)
- .setPkgId(pkgId)
- .setPubAgentName(pubAgentName)
- .addAllPaths(paths)
- .setReqType(ReqType.ADD)
- .addAllDeepPaths(deepPaths)
- .setPkgLength(pkgLength)
- .setUserId(resourceResolver.getUserID())
- .setPkgType(packageBuilder.getType());
+ PackageMessageBuilder pkgBuilder = PackageMessage.builder()
+ .pubSlingId(pubSlingId)
+ .pkgId(pkgId)
+ .pubAgentName(pubAgentName)
+ .paths(paths)
+ .reqType(ReqType.ADD)
+ .deepPaths(deepPaths)
+ .pkgLength(pkgLength)
+ .userId(resourceResolver.getUserID())
+ .pkgType(packageBuilder.getType());
if (pkgLength > MAX_INLINE_PKG_BINARY_SIZE) {
/*
@@ -131,9 +130,9 @@ public class PackageMessageFactory {
LOG.info("Package {} too large ({}B) to be sent inline",
disPkg.getId(), pkgLength);
String pkgBinRef = packageRepo.store(resourceResolver, disPkg);
- pkgBuilder.setPkgBinaryRef(pkgBinRef);
+ pkgBuilder.pkgBinaryRef(pkgBinRef);
} else {
- pkgBuilder.setPkgBinary(ByteString.copyFrom(pkgBinary));
+ pkgBuilder.pkgBinary(pkgBinary);
}
PackageMessage pipePackage = pkgBuilder.build();
disPkg.delete();
@@ -143,27 +142,27 @@ public class PackageMessageFactory {
@Nonnull
private PackageMessage createDelete(DistributionPackageBuilder
packageBuilder, ResourceResolver resourceResolver, DistributionRequest request,
String pubAgentName) {
String pkgId = UUID.randomUUID().toString();
- return PackageMessage.newBuilder()
- .setPubSlingId(pubSlingId)
- .setPkgId(pkgId)
- .setPubAgentName(pubAgentName)
- .addAllPaths(Arrays.asList(request.getPaths()))
- .setReqType(ReqType.DELETE)
- .setPkgType(packageBuilder.getType())
- .setUserId(resourceResolver.getUserID())
+ return PackageMessage.builder()
+ .pubSlingId(pubSlingId)
+ .pkgId(pkgId)
+ .pubAgentName(pubAgentName)
+ .paths(Arrays.asList(request.getPaths()))
+ .reqType(ReqType.DELETE)
+ .pkgType(packageBuilder.getType())
+ .userId(resourceResolver.getUserID())
.build();
}
@Nonnull
public PackageMessage createTest(DistributionPackageBuilder
packageBuilder, ResourceResolver resourceResolver, String pubAgentName) {
String pkgId = UUID.randomUUID().toString();
- return PackageMessage.newBuilder()
- .setPubSlingId(pubSlingId)
- .setPubAgentName(pubAgentName)
- .setPkgId(pkgId)
- .setReqType(ReqType.TEST)
- .setPkgType(packageBuilder.getType())
- .setUserId(resourceResolver.getUserID())
+ return PackageMessage.builder()
+ .pubSlingId(pubSlingId)
+ .pubAgentName(pubAgentName)
+ .pkgId(pkgId)
+ .reqType(ReqType.TEST)
+ .pkgType(packageBuilder.getType())
+ .userId(resourceResolver.getUserID())
.build();
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
index c9808d8..855a757 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
@@ -33,10 +33,10 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
@ParametersAreNonnullByDefault
public final class QueueItemFactory {
@@ -74,14 +74,14 @@ public final class QueueItemFactory {
properties.put(RECORD_TIMESTAMP, info.getCreateTime());
properties.put(PROPERTY_PACKAGE_TYPE, message.getPkgType());
properties.put(PROPERTY_REQUEST_TYPE,
toDistReqType(message.getReqType()));
- String[] paths = toArray(message.getPathsList());
+ String[] paths = toArray(message.getPaths());
properties.put(PROPERTY_REQUEST_PATHS, paths);
- String[] deepPaths = toArray(message.getDeepPathsList());
+ String[] deepPaths = toArray(message.getDeepPaths());
properties.put(PROPERTY_REQUEST_DEEP_PATHS, deepPaths);
if (addMessage) {
properties.put(PACKAGE_MSG, message);
}
- if (message.hasUserId()) {
+ if (message.getUserId() != null) {
properties.put(REQUEST_USER_ID, message.getUserId());
}
return new DistributionQueueItem(packageId, pkgLength, properties);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index ec811d2..6cf0be7 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,10 +19,10 @@
package org.apache.sling.distribution.journal.impl.queue.impl;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.groupingBy;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.util.HashSet;
@@ -43,6 +43,7 @@ import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -158,7 +158,8 @@ public class PubQueueCache {
this.topic,
Reset.earliest,
assignTo,
- create(PackageMessage.class, this::handlePackage));
+ create(PackageMessage.class, this::handlePackage)
+ );
}
@Nonnull
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..c9428b6 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -18,28 +18,26 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-
-import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.service.component.annotations.Activate;
@@ -74,7 +72,7 @@ public class PubQueueProviderImpl implements PubQueueProvider
{
private Closeable statusPoller;
- private MessageSender<CommandMessage> sender;
+ private Consumer<ClearCommand> sender;
public PubQueueProviderImpl() {
}
@@ -93,8 +91,9 @@ public class PubQueueProviderImpl implements PubQueueProvider
{
statusPoller = messagingProvider.createPoller(
topics.getStatusTopic(),
Reset.earliest,
- create(PackageStatusMessage.class, this::handleStatus));
- sender = messagingProvider.createSender();
+ create(PackageStatusMessage.class, this::handleStatus)
+ );
+ sender = messagingProvider.createSender(topics.getCommandTopic());
LOG.info("Started Publisher queue provider service");
}
@@ -131,7 +130,7 @@ public class PubQueueProviderImpl implements
PubQueueProvider {
}
public void handleStatus(MessageInfo info, PackageStatusMessage message) {
- if (message.getStatus() == REMOVED_FAILED) {
+ if (message.getStatus() == Status.REMOVED_FAILED) {
String errorQueueKey = errorQueueKey(message.getPubAgentName(),
message.getSubSlingId(), message.getSubAgentName());
OffsetQueue<Long> errorQueue =
errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
errorQueue.putItem(info.getOffset(), message.getOffset());
@@ -144,16 +143,13 @@ public class PubQueueProviderImpl implements
PubQueueProvider {
}
private void sendClearCommand(String subSlingId, String subAgentName, long
offset) {
- Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
- .setOffset(offset)
- .build();
- CommandMessage commandMessage = CommandMessage.newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setClearCommand(clearCommand)
+ ClearCommand commandMessage = ClearCommand.builder()
+ .subSlingId(subSlingId)
+ .subAgentName(subAgentName)
+ .offset(offset)
.build();
LOG.info("Sending clear command to subSlingId: {}, subAgentName: {}
with offset {}.", subSlingId, subAgentName, offset);
- sender.send(topics.getCommandTopic(), commandMessage);
+ sender.accept(commandMessage);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
index 79f6063..e3ac96c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -27,7 +27,7 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class QueueCacheSeeder implements Closeable {
private void sendSeedingMessages() {
LOG.info("Start message seeder");
try {
- MessageSender<PackageMessage> sender = provider.createSender();
+ MessageSender<PackageMessage> sender =
provider.createSender(topic);
while (!closed) {
sendSeedingMessage(sender);
delay(CACHE_SEEDING_DELAY_MS);
@@ -90,14 +90,14 @@ public class QueueCacheSeeder implements Closeable {
}
private void sendSeedingMessage() {
- sendSeedingMessage(provider.createSender());
+ sendSeedingMessage(provider.createSender(topic));
}
private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
PackageMessage pkgMsg = createTestMessage();
LOG.info("Send seeding message");
try {
- sender.send(topic, pkgMsg);
+ sender.send(pkgMsg);
} catch (MessagingException e) {
LOG.warn(e.getMessage(), e);
delay(CACHE_SEEDING_DELAY_MS * 10);
@@ -114,11 +114,11 @@ public class QueueCacheSeeder implements Closeable {
protected PackageMessage createTestMessage() {
String pkgId = UUID.randomUUID().toString();
- return PackageMessage.newBuilder()
- .setPubSlingId("seeder")
- .setPkgId(pkgId)
- .setPkgType("seeder")
- .setReqType(PackageMessage.ReqType.TEST)
+ return PackageMessage.builder()
+ .pubSlingId("seeder")
+ .pkgId(pkgId)
+ .pkgType("seeder")
+ .reqType(PackageMessage.ReqType.TEST)
.build();
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 68fe9f7..72e7c4a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -31,12 +31,11 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
@ParametersAreNonnullByDefault
public class RangePoller {
@@ -64,7 +63,8 @@ public class RangePoller {
LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
headPoller = messagingProvider.createPoller(
packageTopic, Reset.earliest, assign,
- create(Messages.PackageMessage.class, this::handlePackage));
+ create(PackageMessage.class, this::handlePackage)
+ );
}
public List<FullMessage<PackageMessage>> fetchRange() throws
InterruptedException {
@@ -77,7 +77,7 @@ public class RangePoller {
}
}
- private void handlePackage(MessageInfo info, Messages.PackageMessage
message) {
+ private void handlePackage(MessageInfo info, PackageMessage message) {
long offset = info.getOffset();
LOG.debug("Reading offset {}", offset);
if (offset < maxOffset) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
index 2bb30b5..687c9ab 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
@@ -36,8 +36,7 @@ import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +65,8 @@ public class LimitPoller {
log.info("Fetching {} messages starting from {}", maxMessages,
minOffset);
headPoller = messagingProvider.createPoller(
packageTopic, Reset.earliest, assign,
- create(Messages.PackageMessage.class, this::handlePackage));
+ create(PackageMessage.class, this::handlePackage)
+ );
}
public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
@@ -86,7 +86,7 @@ public class LimitPoller {
}
}
- private void handlePackage(MessageInfo info, Messages.PackageMessage
message) {
+ private void handlePackage(MessageInfo info, PackageMessage message) {
long offset = info.getOffset();
log.debug("Reading offset {}", offset);
if (this.messages.size() < maxMessages && info.getOffset() >=
minOffset) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index b9d1d2d..b02d33a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -43,7 +43,7 @@ import
org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
@@ -78,8 +78,8 @@ public class PackageBrowser {
@Nonnull
public static InputStream pkgStream(ResourceResolver resolver,
PackageMessage pkgMsg) throws DistributionException {
- if (pkgMsg.hasPkgBinary()) {
- return new
ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+ if (pkgMsg.getPkgBinary() != null) {
+ return new ByteArrayInputStream(pkgMsg.getPkgBinary());
} else {
String pkgBinRef = pkgMsg.getPkgBinaryRef();
try {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index 30f8836..308cd61 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -31,8 +31,8 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.felix.webconsole.AbstractWebConsolePlugin;
import org.apache.felix.webconsole.WebConsoleConstants;
import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
@@ -109,7 +109,7 @@ public class PackageViewerPlugin extends
AbstractWebConsolePlugin {
msg.getMessage().getPkgId(),
msg.getInfo().getOffset(),
msg.getMessage().getReqType(),
- msg.getMessage().getPathsList().toString());
+ msg.getMessage().getPaths().toString());
}
private void writePackage(Long offset, HttpServletResponse res) throws
IOException {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..06059e2 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -19,19 +19,20 @@
package org.apache.sling.distribution.journal.impl.subscriber;
import java.io.Closeable;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.messages.Messages;
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,27 +89,28 @@ class Announcer implements Runnable, Closeable {
private DiscoveryMessage createDiscoveryMessage() {
long offset = bookKeeper.loadOffset();
- SubscriberConfiguration subscriberConfiguration =
SubscriberConfiguration.newBuilder()
- .setEditable(editable)
- .setMaxRetries(maxRetries)
+ SubscriberConfig subscriberConfiguration = SubscriberConfig.builder()
+ .editable(editable)
+ .maxRetries(maxRetries)
+ .build();
+ List<SubscriberState> states = pubAgentNames.stream()
+ .map(pubAgentName -> subscriberState(pubAgentName, offset))
+ .collect(Collectors.toList());
+ return DiscoveryMessage
+ .builder()
+ .subSlingId(subSlingId)
+ .subAgentName(subAgentName)
+ .subscriberConfiguration(subscriberConfiguration)
+ .subscriberStates(states)
.build();
- DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
- .newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setSubscriberConfiguration(subscriberConfiguration);
- for (String pubAgentName : pubAgentNames) {
- disMsgBuilder.addSubscriberState(subscriberState(pubAgentName,
offset));
- }
- return disMsgBuilder.build();
}
private SubscriberState subscriberState(String pubAgentName, long offset) {
int retries = bookKeeper.getRetries(pubAgentName);
- return Messages.SubscriberState.newBuilder()
- .setPubAgentName(pubAgentName)
- .setRetries(retries)
- .setOffset(offset)
+ return SubscriberState.builder()
+ .pubAgentName(pubAgentName)
+ .retries(retries)
+ .offset(offset)
.build();
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 2984399..59d9687 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -22,9 +22,6 @@ import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
import java.io.Closeable;
import java.io.IOException;
@@ -45,9 +42,9 @@ import
org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -151,7 +148,7 @@ public class BookKeeper implements Closeable {
ResourceResolver importerResolver =
getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
- storeStatus(importerResolver, new PackageStatus(IMPORTED,
offset, pkgMsg.getPubAgentName()));
+ storeStatus(importerResolver, new
PackageStatus(PackageStatusMessage.Status.IMPORTED, offset,
pkgMsg.getPubAgentName()));
}
storeOffset(importerResolver, offset);
importerResolver.commit();
@@ -170,7 +167,7 @@ public class BookKeeper implements Closeable {
private void addPackageMDC(PackageMessage pkgMsg) {
MDC.put("module", "distribution");
MDC.put("package-id", pkgMsg.getPkgId());
- String paths = String.join(",", pkgMsg.getPathsList());
+ String paths = String.join(",", pkgMsg.getPaths());
MDC.put("paths", paths);
MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
String pubAgentName = pkgMsg.getPubAgentName();
@@ -212,7 +209,7 @@ public class BookKeeper implements Closeable {
Timer.Context context =
distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
- storeStatus(resolver, new PackageStatus(REMOVED, offset,
pkgMsg.getPubAgentName()));
+ storeStatus(resolver, new PackageStatus(Status.REMOVED,
offset, pkgMsg.getPubAgentName()));
}
storeOffset(resolver, offset);
resolver.commit();
@@ -263,12 +260,12 @@ public class BookKeeper implements Closeable {
}
private void sendStatusMessage(PackageStatus status) {
- PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setPubAgentName(status.pubAgentName)
- .setOffset(status.offset)
- .setStatus(status.status)
+ PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder()
+ .subSlingId(subSlingId)
+ .subAgentName(subAgentName)
+ .pubAgentName(status.pubAgentName)
+ .offset(status.offset)
+ .status(status.status)
.build();
sender.accept(pkgStatMsg);
log.info("Sent status message {}", pkgStatMsg);
@@ -305,7 +302,7 @@ public class BookKeeper implements Closeable {
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
Timer.Context context =
distributionMetricsService.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
- storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset,
pkgMsg.getPubAgentName()));
+ storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED,
offset, pkgMsg.getPubAgentName()));
storeOffset(resolver, offset);
resolver.commit();
} catch (Exception e) {
@@ -351,7 +348,7 @@ public class BookKeeper implements Closeable {
PackageStatus(ValueMap statusMap) {
Integer statusNum = statusMap.get("statusNumber", Integer.class);
- this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
+ this.status = statusNum !=null ? Status.fromNumber(statusNum) :
null;
this.offset = statusMap.get(KEY_OFFSET, Long.class);
this.pubAgentName = statusMap.get("pubAgentName", String.class);
this.sent = statusMap.get("sent", true);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..703a9d1 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -28,7 +28,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,8 @@ public class CommandPoller implements Closeable {
poller = messagingProvider.createPoller(
topics.getCommandTopic(),
Reset.earliest,
- create(CommandMessage.class, this::handleCommandMessage));
+ create(ClearCommand.class, this::handleCommandMessage)
+ );
} else {
poller = null;
}
@@ -67,17 +68,13 @@ public class CommandPoller implements Closeable {
return offset <= clearOffset.longValue();
}
- private void handleCommandMessage(MessageInfo info, CommandMessage
message) {
+ private void handleCommandMessage(MessageInfo info, ClearCommand message) {
if (!subSlingId.equals(message.getSubSlingId()) ||
!subAgentName.equals(message.getSubAgentName())) {
LOG.debug("Skip command for subSlingId {}",
message.getSubSlingId());
return;
}
- if (message.hasClearCommand()) {
- handleClearCommand(message.getClearCommand().getOffset());
- } else {
- LOG.warn("Unsupported command {}", message);
- }
+ handleClearCommand(message.getOffset());
}
private void handleClearCommand(long offset) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8bde940..dcd016c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -22,7 +22,6 @@ import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
import static
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
import static
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
@@ -64,9 +63,9 @@ import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
@@ -76,7 +75,8 @@ import
org.apache.sling.distribution.journal.impl.shared.AgentState;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -93,8 +93,6 @@ import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.GeneratedMessage;
-
/**
* A Subscriber SCD agent which consumes messages produced by a
* {@code DistributionPublisher} agent.
@@ -201,14 +199,15 @@ public class DistributionSubscriber implements
DistributionAgent {
ContentPackageExtractor extractor = new
ContentPackageExtractor(packaging, config.packageHandling());
PackageHandler packageHandler = new PackageHandler(packageBuilder,
extractor);
+ Consumer<PackageStatusMessage> sender =
messagingProvider.createSender(topics.getStatusTopic());
bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packageHandler, eventAdmin,
- sender(topics.getStatusTopic()), subAgentName, subSlingId,
editable, maxRetries);
+ sender, subAgentName, subSlingId, editable, maxRetries);
long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
packagePoller =
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
- create(PackageMessage.class, this::handlePackageMessage));
+ HandlerAdapter.create(PackageMessage.class,
this::handlePackageMessage));
commandPoller = new CommandPoller(messagingProvider, topics,
subSlingId, subAgentName, editable);
@@ -216,7 +215,7 @@ public class DistributionSubscriber implements
DistributionAgent {
format("Queue Processor for Subscriber agent %s",
subAgentName));
int announceDelay =
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
- announcer = new Announcer(subSlingId, subAgentName, queueNames,
sender(topics.getDiscoveryTopic()), bookKeeper,
+ announcer = new Announcer(subSlingId, subAgentName, queueNames,
messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
maxRetries, config.editable(), announceDelay);
boolean errorQueueEnabled = (maxRetries >= 0);
@@ -228,11 +227,6 @@ public class DistributionSubscriber implements
DistributionAgent {
componentReg = context.registerService(DistributionAgent.class, this,
props);
}
- private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
- MessageSender<T> sender = messagingProvider.createSender();
- return msg -> sender.send(topic, msg);
- }
-
private Set<String> getNotEmpty(String[] agentNames) {
return
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index bdfb3dd..bccf474 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -28,7 +28,7 @@ import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,12 +63,12 @@ public class PackageHandler {
private void installAddPackage(ResourceResolver resolver, PackageMessage
pkgMsg)
throws DistributionException {
- LOG.info("Importing paths {}",pkgMsg.getPathsList());
+ LOG.info("Importing paths {}",pkgMsg.getPaths());
InputStream pkgStream = null;
try {
pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
packageBuilder.installPackage(resolver, pkgStream);
- extractor.handle(resolver, pkgMsg.getPathsList());
+ extractor.handle(resolver, pkgMsg.getPaths());
} finally {
IOUtils.closeQuietly(pkgStream);
}
@@ -77,8 +77,8 @@ public class PackageHandler {
private void installDeletePackage(ResourceResolver resolver,
PackageMessage pkgMsg)
throws PersistenceException {
- LOG.info("Deleting paths {}",pkgMsg.getPathsList());
- for (String path : pkgMsg.getPathsList()) {
+ LOG.info("Deleting paths {}",pkgMsg.getPaths());
+ for (String path : pkgMsg.getPaths()) {
Resource resource = resolver.getResource(path);
if (resource != null) {
resolver.delete(resource);
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 1c5350e..f882f32 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -18,11 +18,10 @@
*/
package org.apache.sling.distribution.journal.impl.precondition;
-import
org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -96,12 +95,12 @@ public class PackageStatusWatcherTest {
}
PackageStatusMessage createStatusMessage(int i) {
- return PackageStatusMessage.newBuilder()
- .setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(SUB1_AGENT_NAME)
- .setPubAgentName(PUB1_AGENT_NAME)
- .setOffset(1000 + i)
- .setStatus(PackageStatusMessage.Status.REMOVED_FAILED)
+ return PackageStatusMessage.builder()
+ .subSlingId(SUB1_SLING_ID)
+ .subAgentName(SUB1_AGENT_NAME)
+ .pubAgentName(PUB1_AGENT_NAME)
+ .offset(1000 + i)
+ .status(PackageStatusMessage.Status.REMOVED_FAILED)
.build();
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 59b1c14..3a31224 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -37,7 +37,7 @@ import
org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -143,12 +143,12 @@ public class StagingPreconditionTest {
}
private void simulateMessage(String subAgentName, long pkgOffset,
PackageStatusMessage.Status status) {
- PackageStatusMessage message = PackageStatusMessage.newBuilder()
- .setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(subAgentName)
- .setPubAgentName(PUB1_AGENT_NAME)
- .setOffset(pkgOffset)
- .setStatus(status)
+ PackageStatusMessage message = PackageStatusMessage.builder()
+ .subSlingId(SUB1_SLING_ID)
+ .subAgentName(subAgentName)
+ .pubAgentName(PUB1_AGENT_NAME)
+ .offset(pkgOffset)
+ .status(status)
.build();
TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0);
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index d39f280..0adf713 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -36,9 +36,9 @@ import
org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -123,20 +123,20 @@ public class DiscoveryServiceTest {
}
private DiscoveryMessage discoveryMessage(String subSlingId, String
subAgentName, SubscriberState... subStates) {
- return DiscoveryMessage.newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
+ return DiscoveryMessage.builder()
+ .subSlingId(subSlingId)
+ .subAgentName(subAgentName)
+ .subscriberConfiguration(SubscriberConfig
+ .builder()
+ .editable(false)
+ .maxRetries(-1)
.build())
- .addAllSubscriberState(Arrays.asList(subStates)).build();
+ .subscriberStates(Arrays.asList(subStates)).build();
}
private SubscriberState subscriberState(String pubAgentName, int offset) {
- return SubscriberState.newBuilder()
- .setPubAgentName(pubAgentName)
- .setOffset(offset).build();
+ return SubscriberState.builder()
+ .pubAgentName(pubAgentName)
+ .offset(offset).build();
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
index 125aa5f..929de44 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@@ -30,12 +31,14 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.junit.Before;
@@ -66,7 +69,7 @@ public class DistributionPackageFactoryTest {
public void testAdd() throws DistributionException, IOException {
DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
- org.apache.sling.distribution.packaging.DistributionPackage pkg =
mock(org.apache.sling.distribution.packaging.DistributionPackage.class);
+ DistributionPackage pkg = mock(DistributionPackage.class);
when(pkg.createInputStream()).thenReturn(new ByteArrayInputStream(new
byte[] {}));
when(pkg.getId()).thenReturn("myid");
Map<String, Object> props = new HashMap<>();
@@ -77,25 +80,25 @@ public class DistributionPackageFactoryTest {
when(pkg.getInfo()).thenReturn(info);
when(packageBuilder.createPackage(Mockito.eq(resourceResolver),
Mockito.eq(request))).thenReturn(pkg);
- Messages.PackageMessage sent = publisher.create(packageBuilder,
resourceResolver, "pub1agent1", request);
+ PackageMessage sent = publisher.create(packageBuilder,
resourceResolver, "pub1agent1", request);
assertThat(sent.getPkgBinary(), notNullValue());
assertThat(sent.getPkgLength(), equalTo(0L));
- assertThat(sent.getReqType(),
equalTo(Messages.PackageMessage.ReqType.ADD));
+ assertThat(sent.getReqType(), equalTo(ReqType.ADD));
assertThat(sent.getPkgType(), equalTo("journal"));
- assertThat(sent.getPathsList(), contains("/test"));
- assertThat(sent.getDeepPathsList(), contains("/test2"));
+ assertThat(sent.getPaths(), contains("/test"));
+ assertThat(sent.getDeepPaths(), contains("/test2"));
}
@Test
public void testDelete() throws DistributionException, IOException {
DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.DELETE, "/test");
- Messages.PackageMessage sent = publisher.create(packageBuilder,
resourceResolver, "pub1agent1", request);
- assertThat(sent.getReqType(),
equalTo(Messages.PackageMessage.ReqType.DELETE));
- assertThat(sent.getPkgBinary(), notNullValue());
+ PackageMessage sent = publisher.create(packageBuilder,
resourceResolver, "pub1agent1", request);
+ assertThat(sent.getReqType(), equalTo(ReqType.DELETE));
+ assertThat(sent.getPkgBinary(), nullValue());
assertThat(sent.getPkgLength(), equalTo(0L));
- assertThat(sent.getPathsList(), contains("/test"));
+ assertThat(sent.getPaths(), contains("/test"));
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..32a8a11 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
@@ -131,7 +130,7 @@ public class DistributionPublisherTest {
private ResourceResolver resourceResolver;
@Mock
- private MessageSender<Messages.PackageMessage> sender;
+ private MessageSender<PackageMessage> sender;
@Mock
private PackageQueuedNotifier queuedNotifier;
@@ -140,7 +139,7 @@ public class DistributionPublisherTest {
private TopologyView topology;
@Captor
- private ArgumentCaptor<Messages.PackageMessage> pkgCaptor;
+ private ArgumentCaptor<PackageMessage> pkgCaptor;
@Spy
private Topics topics = new Topics();
@@ -155,7 +154,7 @@ public class DistributionPublisherTest {
when(slingSettings.getSlingId()).thenReturn("pub1sling");
when(context.registerService(Mockito.eq(DistributionAgent.class),
Mockito.eq(publisher),
Mockito.any(Dictionary.class))).thenReturn(serviceReg);
-
when(messagingProvider.<Messages.PackageMessage>createSender()).thenReturn(sender);
+
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
publisher.activate(config, context);
when(timer.time()).thenReturn(timerContext);
}
@@ -171,7 +170,7 @@ public class DistributionPublisherTest {
public void testSend() throws DistributionException, IOException {
DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
- Messages.PackageMessage pkg = mockPackage(request);
+ PackageMessage pkg = mockPackage(request);
when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver),
anyString(), Mockito.eq(request))).thenReturn(pkg);
CompletableFuture<Void> callback =
CompletableFuture.completedFuture(null);
when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
@@ -184,8 +183,8 @@ public class DistributionPublisherTest {
DistributionResponse response = publisher.execute(resourceResolver,
request);
assertThat(response.getState(),
equalTo(DistributionRequestState.ACCEPTED));
- verify(sender).send(Mockito.eq(topics.getPackageTopic()),
pkgCaptor.capture());
- Messages.PackageMessage sent = pkgCaptor.getValue();
+ verify(sender).accept(pkgCaptor.capture());
+ PackageMessage sent = pkgCaptor.getValue();
// Individual fields are checks in factory
assertThat(sent, notNullValue());
@@ -252,15 +251,15 @@ public class DistributionPublisherTest {
return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false);
}
- private Messages.PackageMessage mockPackage(DistributionRequest request)
throws IOException {
- return Messages.PackageMessage.newBuilder()
- .setPkgId("myid")
- .setPubSlingId("pub1sling")
- .setPkgType("journal")
- .setReqType(Messages.PackageMessage.ReqType.ADD)
- .addAllPaths(Arrays.asList(request.getPaths()))
- .addDeepPaths("/test2")
- .setPkgBinary(ByteString.copyFrom(new byte[100]))
+ private PackageMessage mockPackage(DistributionRequest request) throws
IOException {
+ return PackageMessage.builder()
+ .pkgId("myid")
+ .pubSlingId("pub1sling")
+ .pkgType("journal")
+ .reqType(PackageMessage.ReqType.ADD)
+ .paths(Arrays.asList(request.getPaths()))
+ .deepPaths(Arrays.asList("/test2"))
+ .pkgBinary(new byte[100])
.build();
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 2444d21..6bf412f 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -28,8 +29,9 @@ import org.junit.Assert;
import org.junit.Test;
import org.osgi.service.event.Event;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+
import static
org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED;
public class PackageQueuedNotifierTest {
@@ -66,12 +68,12 @@ public class PackageQueuedNotifierTest {
}
private PackageMessage pkgMsg(String packageId) {
- return PackageMessage.newBuilder()
- .addPaths("/test")
- .setPkgId(packageId)
- .setReqType(ReqType.ADD)
- .setPkgType("journal")
- .setPubSlingId("sling1")
+ return PackageMessage.builder()
+ .paths(Arrays.asList("/test"))
+ .pkgId(packageId)
+ .reqType(ReqType.ADD)
+ .pkgType("journal")
+ .pubSlingId("sling1")
.build();
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index 43ed4f7..8ec5aae 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -39,7 +39,7 @@ import
org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessagingProvider;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.sling.MockSling;
@@ -79,7 +79,7 @@ public class PackageRepoTest {
private DistributionMetricsService distributionMetricsService;
@Captor
- private ArgumentCaptor<Messages.PackageMessage> pkgCaptor;
+ private ArgumentCaptor<PackageMessage> pkgCaptor;
@Spy
private Topics topics = new Topics();
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 1bf6fd8..8097105 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -30,14 +30,16 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.junit.Assert.assertThat;
+import java.util.Arrays;
+
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.hamcrest.Matcher;
import org.junit.Test;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.MessageInfo;
public class QueueItemFactoryTest {
@@ -47,13 +49,13 @@ public class QueueItemFactoryTest {
@Test
public void test() {
MessageInfo info = new TestMessageInfo("topic", 0, 1, 2);
- PackageMessage message = PackageMessage.newBuilder()
- .setPubSlingId("sling1")
- .setPkgId("pkg1")
- .setPkgType("type")
- .addPaths("/")
- .addDeepPaths("/deep")
- .setReqType(ReqType.ADD)
+ PackageMessage message = PackageMessage.builder()
+ .pubSlingId("sling1")
+ .pkgId("pkg1")
+ .pkgType("type")
+ .paths(Arrays.asList("/"))
+ .deepPaths(Arrays.asList("/deep"))
+ .reqType(ReqType.ADD)
.build();
item = QueueItemFactory.fromPackage(info, message, true);
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 3f0dc5e..e7f2444 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -20,9 +20,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.ADD;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
@@ -52,8 +49,8 @@ import
org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
@@ -157,7 +154,7 @@ public class PubQueueCacheTest {
@Test
public void testSeedingFromNewPackageMessage() throws Exception {
- simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+ simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 0);
OffsetQueue<DistributionQueueItem> queue =
cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
assertThat(queue.getSize(), greaterThan(0));
}
@@ -204,12 +201,12 @@ public class PubQueueCacheTest {
@Test
public void testCacheSize() throws Exception {
- simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
- simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
- simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
- simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3); // TEST
message does not increase the cache size
- simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4); // TEST
message does not increase the cache size
- simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
+ simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 0);
+ simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.DELETE, 1);
+ simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 2);
+ simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.TEST, 3); //
TEST message does not increase the cache size
+ simulateMessage(tailHandler, PUB_AGENT_NAME_2, ReqType.TEST, 4); //
TEST message does not increase the cache size
+ simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 5);
assertEquals(4, cache.size());
}
@@ -220,16 +217,16 @@ public class PubQueueCacheTest {
private void simulateMessage(MessageHandler<PackageMessage> handler, long
offset) {
simulateMessage(handler,
pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3),
- pickAny(ADD, DELETE, TEST), offset);
+ pickAny(ReqType.ADD, ReqType.DELETE, ReqType.TEST), offset);
}
private void simulateMessage(MessageHandler<PackageMessage> handler,
String pubAgentName, ReqType reqType, long offset) {
- PackageMessage msg = PackageMessage.newBuilder()
- .setPkgType("pkgType")
- .setPkgId(UUID.randomUUID().toString())
- .setPubSlingId("pubSlingId")
- .setReqType(reqType)
- .setPubAgentName(pubAgentName)
+ PackageMessage msg = PackageMessage.builder()
+ .pkgType("pkgType")
+ .pkgId(UUID.randomUUID().toString())
+ .pubSlingId("pubSlingId")
+ .reqType(reqType)
+ .pubAgentName(pubAgentName)
.build();
simulateMessage(handler, msg, offset);
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 009235c..2435c3a 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
@@ -41,9 +42,11 @@ import javax.management.ReflectionException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.MessageSender;
-import com.google.protobuf.GeneratedMessage;
-
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -59,16 +62,11 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.osgi.service.event.EventAdmin;
@@ -88,7 +86,7 @@ public class PubQueueProviderTest {
private SlingSettingsService slingSettings;
@Captor
- private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>>
handlerCaptor;
+ private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@Captor
private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>>
statHandlerCaptor;
@@ -103,13 +101,13 @@ public class PubQueueProviderTest {
private EventAdmin eventAdmin;
@Mock
- private MessageSender<GeneratedMessage> sender;
+ private MessageSender<Object> sender;
private ResourceResolverFactory resolverFactory = new
MockResourceResolverFactory();
private PubQueueCacheService pubQueueCacheService;
- private MessageHandler<Messages.PackageMessage> handler;
+ private MessageHandler<PackageMessage> handler;
private MessageHandler<PackageStatusMessage> statHandler;
private PubQueueProviderImpl queueProvider;
@@ -129,7 +127,7 @@ public class PubQueueProviderTest {
Mockito.any(Reset.class),
statHandlerCaptor.capture()))
.thenReturn(statPoller);
- when(clientProvider.createSender())
+ when(clientProvider.createSender(Mockito.anyString()))
.thenReturn(sender);
Topics topics = new Topics();
String slingId = UUID.randomUUID().toString();
@@ -209,12 +207,12 @@ public class PubQueueProviderTest {
}
private PackageStatusMessage statusMessage(long offset, PackageMessage
pkgMsg1) {
- return PackageStatusMessage.newBuilder()
- .setOffset(offset)
- .setPubAgentName(PUB1_AGENT_NAME)
- .setStatus(Status.REMOVED_FAILED)
- .setSubAgentName(SUB_AGENT_NAME)
- .setSubSlingId(SUB_SLING_ID)
+ return PackageStatusMessage.builder()
+ .offset(offset)
+ .pubAgentName(PUB1_AGENT_NAME)
+ .status(Status.REMOVED_FAILED)
+ .subAgentName(SUB_AGENT_NAME)
+ .subSlingId(SUB_SLING_ID)
.build();
}
@@ -224,13 +222,13 @@ public class PubQueueProviderTest {
}
private PackageMessage packageMessage(String packageId, String
pubAgentName) {
- return Messages.PackageMessage.newBuilder()
- .setPubAgentName(pubAgentName)
- .setPubSlingId("pub1SlingId")
- .setPkgId(packageId)
- .setReqType(ReqType.ADD)
- .setPkgType("journal")
- .addPaths("path")
+ return PackageMessage.builder()
+ .pubAgentName(pubAgentName)
+ .pubSlingId("pub1SlingId")
+ .pkgId(packageId)
+ .reqType(ReqType.ADD)
+ .pkgType("journal")
+ .paths(Arrays.asList("path"))
.build();
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
index eeebb7a..69b42ca 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -27,7 +27,8 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,7 +41,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import static java.lang.System.currentTimeMillis;
import static
org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC;
-import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
@@ -81,8 +81,8 @@ public class QueueCacheSeederTest {
any(Reset.class),
pkgHandlerCaptor.capture()))
.thenReturn(poller);
- doNothing().when(sender).send(eq(PACKAGE_TOPIC),
pkgMsgCaptor.capture());
- when(clientProvider.<PackageMessage>createSender())
+ doNothing().when(sender).send(pkgMsgCaptor.capture());
+ when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC)))
.thenReturn(sender);
seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
}
@@ -99,10 +99,10 @@ public class QueueCacheSeederTest {
@Test
public void testSendingSeeds() {
seeder.seed(callback);
- verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC),
pkgMsgCaptor.capture());
+ verify(sender,
timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture());
PackageMessage seedMsg = pkgMsgCaptor.getValue();
assertNotNull(seedMsg);
- assertEquals(TEST, seedMsg.getReqType());
+ assertEquals(ReqType.TEST, seedMsg.getReqType());
}
@After
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 9a597f8..a1d66fb 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.apache.sling.distribution.journal.FullMessage;
@@ -36,9 +37,8 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -58,7 +58,7 @@ public class RangePollerTest {
private MessagingProvider clientProvider;
@Captor
- private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>>
handlerCaptor;
+ private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@Mock
private Closeable poller;
@@ -107,15 +107,15 @@ public class RangePollerTest {
}
}
- private FullMessage<Messages.PackageMessage> createMessage(ReqType
reqType, int offset) {
+ private FullMessage<PackageMessage> createMessage(ReqType reqType, int
offset) {
MessageInfo info = new TestMessageInfo(TOPIC, 0, offset,
System.currentTimeMillis());
- PackageMessage message = Messages.PackageMessage.newBuilder()
- .setPubAgentName("agent1")
- .setPubSlingId("pub1SlingId")
- .setPkgId("package-" + offset)
- .setReqType(reqType)
- .setPkgType("journal")
- .addPaths("path")
+ PackageMessage message = PackageMessage.builder()
+ .pubAgentName("agent1")
+ .pubSlingId("pub1SlingId")
+ .pkgId("package-" + offset)
+ .reqType(reqType)
+ .pkgType("journal")
+ .paths(Arrays.asList("path"))
.build();
return new FullMessage<>(info, message);
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
index 5546b50..8f7c262 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
@@ -27,9 +27,11 @@ import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
+import java.util.Arrays;
import java.util.List;
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,9 +41,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
@@ -60,7 +59,7 @@ public class LimitPollerTest {
private MessagingProvider clientProvider;
@Captor
- private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>>
handlerCaptor;
+ private
ArgumentCaptor<HandlerAdapter<org.apache.sling.distribution.journal.messages.PackageMessage>>
handlerCaptor;
@Mock
private Closeable poller;
@@ -112,15 +111,15 @@ public class LimitPollerTest {
return message;
}
- private FullMessage<Messages.PackageMessage> createMessage(int offset) {
+ private FullMessage<PackageMessage> createMessage(int offset) {
MessageInfo info = new TestMessageInfo(TOPIC, 0, offset,
System.currentTimeMillis());
- PackageMessage message = Messages.PackageMessage.newBuilder()
- .setPubAgentName("agent1")
- .setPubSlingId("pub1SlingId")
- .setPkgId("package-" + offset)
- .setReqType(ReqType.ADD)
- .setPkgType("journal")
- .addPaths("path")
+ PackageMessage message = PackageMessage.builder()
+ .pubAgentName("agent1")
+ .pubSlingId("pub1SlingId")
+ .pkgId("package-" + offset)
+ .reqType(ReqType.ADD)
+ .pkgType("journal")
+ .paths(Arrays.asList("path"))
.build();
return new FullMessage<>(info, message);
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
index 659f69e..c19f633 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import javax.jcr.Binary;
import javax.jcr.Session;
@@ -34,8 +35,8 @@ import javax.jcr.ValueFactory;
import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@@ -85,10 +86,12 @@ public class PackageBrowserTest {
}
private PackageMessage createPackageMsg(long offset) throws Exception {
- return
PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
- .addPaths("/content")
- .setPkgId("pkgid")
- .setPkgType("some_type")
- .setPkgBinaryRef("myref").build();
+ return PackageMessage.builder()
+ .pubSlingId("")
+ .reqType(ReqType.ADD)
+ .paths(Arrays.asList("/content"))
+ .pkgId("pkgid")
+ .pkgType("some_type")
+ .pkgBinaryRef("myref").build();
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index 1a46360..36009ba 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -41,8 +42,8 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -52,8 +53,6 @@ import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
-import com.google.protobuf.ByteString;
-
@RunWith(MockitoJUnitRunner.class)
public class PackageViewerPluginTest {
@@ -147,13 +146,13 @@ public class PackageViewerPluginTest {
private FullMessage<PackageMessage> createPackageMsg(long offset) {
MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0L);
- PackageMessage message = PackageMessage.newBuilder()
- .setPubSlingId("")
- .setReqType(ReqType.ADD)
- .addPaths("/content")
- .setPkgId("pkgid")
- .setPkgType("some_type")
- .setPkgBinary(ByteString.copyFrom("package content",
Charset.defaultCharset()))
+ PackageMessage message = PackageMessage.builder()
+ .pubSlingId("")
+ .reqType(ReqType.ADD)
+ .paths(Arrays.asList("/content"))
+ .pkgId("pkgid")
+ .pkgType("some_type")
+ .pkgBinary("package
content".getBytes(Charset.defaultCharset()))
.build();
return new FullMessage<>(info, message);
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
index a2105e3..a5112f9 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import java.util.Map;
+
import org.apache.sling.distribution.journal.MessageInfo;
public class TestMessageInfo implements MessageInfo {
@@ -50,4 +52,9 @@ public class TestMessageInfo implements MessageInfo {
return createTime;
}
+ @Override
+ public Map<String, String> getProps() {
+ return null;
+ }
+
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 5746dc7..e380f5e 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,8 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.function.Consumer;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -41,15 +42,15 @@ public class AnnouncerTest {
@Test
@SuppressWarnings("unchecked")
public void testDiscoveryMessage() throws InterruptedException {
- Consumer<Messages.DiscoveryMessage> sender =
Mockito.mock(Consumer.class);
+ Consumer<DiscoveryMessage> sender = Mockito.mock(Consumer.class);
BookKeeper bookKeeper = Mockito.mock(BookKeeper.class);
when(bookKeeper.loadOffset()).thenReturn(1L);
Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME,
Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000);
Thread.sleep(200);
- ArgumentCaptor<Messages.DiscoveryMessage> msg =
forClass(Messages.DiscoveryMessage.class);
+ ArgumentCaptor<DiscoveryMessage> msg =
forClass(DiscoveryMessage.class);
verify(sender).accept(msg.capture());
- Messages.DiscoveryMessage message = msg.getValue();
- Messages.SubscriberState offset =
message.getSubscriberStateList().iterator().next();
+ DiscoveryMessage message = msg.getValue();
+ SubscriberState offset =
message.getSubscriberStates().iterator().next();
assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID));
assertThat(offset.getPubAgentName(), equalTo(PUB1_AGENT_NAME));
assertThat(offset.getOffset(), equalTo(1L));
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 1345895..3afa938 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -27,7 +27,7 @@ import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
import org.junit.Test;
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 72d0257..9cc3503 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -35,8 +35,7 @@ import
org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -66,9 +65,9 @@ public class CommandPollerTest {
CommandPoller commandPoller;
@Captor
- private ArgumentCaptor<HandlerAdapter<CommandMessage>> handlerCaptor;
+ private ArgumentCaptor<HandlerAdapter<ClearCommand>> handlerCaptor;
- private MessageHandler<CommandMessage> commandHandler;
+ private MessageHandler<ClearCommand> commandHandler;
private Topics topics = new Topics();
@@ -120,15 +119,6 @@ public class CommandPollerTest {
}
@Test
- public void testIgnoreInvalidCommand() throws DistributionException,
InterruptedException, IOException {
- createCommandPoller(true);
-
- CommandMessage message =
CommandMessage.newBuilder(commandMessage(10L)).clearClearCommand().build();
- commandHandler.handle(info, message);
- assertClearedUpTo(-1);
- }
-
- @Test
public void testEditable() throws DistributionException,
InterruptedException, IOException {
createCommandPoller(true);
@@ -150,18 +140,15 @@ public class CommandPollerTest {
assertThat(commandPoller.isCleared(1), equalTo(false));
}
- private CommandMessage commandMessage(long offset) {
+ private ClearCommand commandMessage(long offset) {
return commandMessage(SUB_SLING_ID, SUB_AGENT_NAME, offset);
}
- private CommandMessage commandMessage(String subSlingId, String
subAgentName, long offset) {
- ClearCommand command = ClearCommand.newBuilder()
- .setOffset(offset)
- .build();
- return CommandMessage.newBuilder()
- .setClearCommand(command)
- .setSubAgentName(subAgentName)
- .setSubSlingId(subSlingId)
+ private ClearCommand commandMessage(String subSlingId, String
subAgentName, long offset) {
+ return ClearCommand.builder()
+ .subAgentName(subAgentName)
+ .subSlingId(subSlingId)
+ .offset(offset)
.build();
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 6ee55f8..a54a26b 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -52,6 +52,10 @@ import
org.apache.sling.distribution.journal.impl.precondition.Precondition;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
@@ -97,18 +101,12 @@ import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
@SuppressWarnings("unchecked")
public class SubscriberTest {
@@ -119,23 +117,23 @@ public class SubscriberTest {
private static final String PUB1_SLING_ID = "pub1sling";
private static final String PUB1_AGENT_NAME = "pub1agent";
- private static final PackageMessage BASIC_ADD_PACKAGE =
PackageMessage.newBuilder()
- .setPkgId("myid")
- .setPubSlingId(PUB1_SLING_ID)
- .setPubAgentName(PUB1_AGENT_NAME)
- .setReqType(ReqType.ADD)
- .setPkgType("journal")
- .addAllPaths(Arrays.asList("/test"))
- .setPkgBinary(ByteString.copyFrom(new byte[100]))
+ private static final PackageMessage BASIC_ADD_PACKAGE =
PackageMessage.builder()
+ .pkgId("myid")
+ .pubSlingId(PUB1_SLING_ID)
+ .pubAgentName(PUB1_AGENT_NAME)
+ .reqType(ReqType.ADD)
+ .pkgType("journal")
+ .paths(Arrays.asList("/test"))
+ .pkgBinary(new byte[100])
.build();
- private static final PackageMessage BASIC_DEL_PACKAGE =
PackageMessage.newBuilder()
- .setPkgId("myid")
- .setPubSlingId(PUB1_SLING_ID)
- .setPubAgentName(PUB1_AGENT_NAME)
- .setReqType(ReqType.DELETE)
- .setPkgType("journal")
- .addAllPaths(Arrays.asList("/test"))
+ private static final PackageMessage BASIC_DEL_PACKAGE =
PackageMessage.builder()
+ .pkgId("myid")
+ .pubSlingId(PUB1_SLING_ID)
+ .pubAgentName(PUB1_AGENT_NAME)
+ .reqType(ReqType.DELETE)
+ .pkgType("journal")
+ .paths(Arrays.asList("/test"))
.build();
@@ -193,7 +191,6 @@ public class SubscriberTest {
private MessageHandler<PackageMessage> packageHandler;
- @SuppressWarnings("rawtypes")
@Before
public void before() {
DistributionSubscriber.QUEUE_FETCH_DELAY = 100;
@@ -207,7 +204,9 @@ public class SubscriberTest {
mockMetrics();
-
when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender,
(MessageSender) discoverySender);
+
when(clientProvider.<PackageStatusMessage>createSender(Mockito.eq(topics.getStatusTopic()))).thenReturn(statusSender);
+
when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender);
+
when(clientProvider.createPoller(
Mockito.anyString(),
Mockito.eq(Reset.earliest),
@@ -251,8 +250,7 @@ public class SubscriberTest {
sem.release();
waitSubscriber(IDLE);
- verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
- anyObject());
+ verify(statusSender, times(0)).accept(anyObject());
List<String> log = subscriber.getLog().getLines();
// We do not use the DistributionLog anymore
assertThat(log.size(), equalTo(0));
@@ -302,8 +300,7 @@ public class SubscriberTest {
).thenThrow(new RuntimeException("Expected"));
packageHandler.handle(info, message);
- verify(statusSender,
timeout(10000).times(1)).send(eq(topics.getStatusTopic()),
- anyObject());
+ verify(statusSender, timeout(10000).times(1)).accept(anyObject());
}
@Test
@@ -317,8 +314,7 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(IDLE);
- verify(statusSender,
timeout(10000).times(1)).send(eq(topics.getStatusTopic()),
- anyObject());
+ verify(statusSender, timeout(10000).times(1)).accept(anyObject());
}
@Test