This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new bf91078  SLING-9504 - Migrate to json (#41)
bf91078 is described below

commit bf910786e225d08d28cb45d5bd66735a3855ecb3
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jun 18 17:28:21 2020 +0200

    SLING-9504 - Migrate to json (#41)
---
 pom.xml                                            |  4 +-
 .../journal/impl/event/DistributionEvent.java      | 12 ++---
 .../impl/precondition/PackageStatusWatcher.java    |  9 ++--
 .../impl/precondition/StagingPrecondition.java     |  2 +-
 .../journal/impl/publisher/DiscoveryService.java   | 17 +++---
 .../impl/publisher/DistributionPublisher.java      | 16 ++----
 .../impl/publisher/PackageDistributedNotifier.java |  8 +--
 .../impl/publisher/PackageMessageFactory.java      | 61 +++++++++++-----------
 .../journal/impl/queue/QueueItemFactory.java       | 10 ++--
 .../journal/impl/queue/impl/PubQueueCache.java     |  7 +--
 .../impl/queue/impl/PubQueueProviderImpl.java      | 40 +++++++-------
 .../journal/impl/queue/impl/QueueCacheSeeder.java  | 18 +++----
 .../journal/impl/queue/impl/RangePoller.java       |  8 +--
 .../journal/impl/shared/LimitPoller.java           |  8 +--
 .../journal/impl/shared/PackageBrowser.java        |  6 +--
 .../journal/impl/shared/PackageViewerPlugin.java   |  6 +--
 .../journal/impl/subscriber/Announcer.java         | 42 ++++++++-------
 .../journal/impl/subscriber/BookKeeper.java        | 31 +++++------
 .../journal/impl/subscriber/CommandPoller.java     | 13 ++---
 .../impl/subscriber/DistributionSubscriber.java    | 20 +++----
 .../journal/impl/subscriber/PackageHandler.java    | 10 ++--
 .../precondition/PackageStatusWatcherTest.java     | 17 +++---
 .../impl/precondition/StagingPreconditionTest.java | 14 ++---
 .../impl/publisher/DiscoveryServiceTest.java       | 28 +++++-----
 .../publisher/DistributionPackageFactoryTest.java  | 23 ++++----
 .../impl/publisher/DistributionPublisherTest.java  | 33 ++++++------
 .../impl/publisher/PackageQueuedNotifierTest.java  | 18 ++++---
 .../journal/impl/publisher/PackageRepoTest.java    |  4 +-
 .../journal/impl/queue/QueueItemFactoryTest.java   | 20 +++----
 .../journal/impl/queue/impl/PubQueueCacheTest.java | 35 ++++++-------
 .../impl/queue/impl/PubQueueProviderTest.java      | 46 ++++++++--------
 .../impl/queue/impl/QueueCacheSeederTest.java      | 12 ++---
 .../journal/impl/queue/impl/RangePollerTest.java   | 24 ++++-----
 .../journal/impl/shared/LimitPollerTest.java       | 25 +++++----
 .../journal/impl/shared/PackageBrowserTest.java    | 17 +++---
 .../impl/shared/PackageViewerPluginTest.java       | 21 ++++----
 .../journal/impl/shared/TestMessageInfo.java       |  7 +++
 .../journal/impl/subscriber/AnnouncerTest.java     | 11 ++--
 .../journal/impl/subscriber/BookKeeperTest.java    |  2 +-
 .../journal/impl/subscriber/CommandPollerTest.java | 31 ++++-------
 .../journal/impl/subscriber/SubscriberTest.java    | 54 +++++++++----------
 41 files changed, 380 insertions(+), 410 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2bb23f6..78cc146 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
     <!-- P R O J E C T                                                         
  -->
     <!-- 
======================================================================= -->
     <artifactId>org.apache.sling.distribution.journal</artifactId>
-    <version>0.1.17-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
 
     <name>Apache Sling Journal based Content Distribution - Core bundle</name>
     <description>Implementation of Apache Sling Content Distribution 
components on top of an append-only persisted log</description>
@@ -140,7 +140,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.1.2</version>
+            <version>0.2.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
index c153ad3..a22e724 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
@@ -35,12 +35,10 @@ import java.util.Map;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-
 @ParametersAreNonnullByDefault
 public class DistributionEvent {
 
@@ -51,11 +49,11 @@ public class DistributionEvent {
     private DistributionEvent() {
     }
     
-    public static Event eventImporterImported(Messages.PackageMessage pkgMsg, 
String agentName) {
+    public static Event eventImporterImported(PackageMessage pkgMsg, String 
agentName) {
         return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName, 
pkgMsg);
     }
 
-    public static Event eventPackageCreated(Messages.PackageMessage pkgMsg, 
String agentName) {
+    public static Event eventPackageCreated(PackageMessage pkgMsg, String 
agentName) {
         return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName, 
pkgMsg);
     }
 
@@ -66,12 +64,12 @@ public class DistributionEvent {
                 queueItem.getPackageId());
     }
 
-    public static Event eventPackageQueued(Messages.PackageMessage pkgMsg, 
String agentName) {
+    public static Event eventPackageQueued(PackageMessage pkgMsg, String 
agentName) {
         return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg);
     }
 
     private static Event buildEvent(String topic, String kind, String 
agentName, PackageMessage pkgMsg) {
-        List<String> pathsList = pkgMsg.getPathsList();
+        List<String> pathsList = pkgMsg.getPaths();
         return buildEvent(topic, kind, agentName,
                 pkgMsg.getReqType().name(),
                 pathsList.toArray(new String[0]),
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..d359f44 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -30,9 +30,8 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 
 public class PackageStatusWatcher implements Closeable {
     private final Closeable poller;
@@ -46,7 +45,7 @@ public class PackageStatusWatcher implements Closeable {
         poller = messagingProvider.createPoller(
                 topicName,
                 Reset.earliest,
-                create(Messages.PackageStatusMessage.class, this::handle)
+                create(PackageStatusMessage.class, this::handle)
         );
     }
 
@@ -73,7 +72,7 @@ public class PackageStatusWatcher implements Closeable {
         poller.close();
     }
 
-    private void handle(MessageInfo info, Messages.PackageStatusMessage 
pkgStatusMsg) {
+    private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
         // TODO: check revision
         Map<Long, Status> agentStatus = 
getAgentStatus(pkgStatusMsg.getSubAgentName());
         agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index d523cdf..9b55848 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 496ff81..d2c4972 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -19,9 +19,9 @@
 package org.apache.sling.distribution.journal.impl.publisher;
 
 import static java.lang.String.format;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.util.Dictionary;
@@ -31,8 +31,9 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import 
org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.apache.commons.io.IOUtils;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -41,7 +42,6 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -105,7 +105,8 @@ public class DiscoveryService implements Runnable {
         poller = messagingProvider.createPoller(
                 topics.getDiscoveryTopic(), 
                 Reset.latest,
-                create(Messages.DiscoveryMessage.class, new 
DiscoveryMessageHandler()));
+                create(DiscoveryMessage.class, new DiscoveryMessageHandler())
+                ); 
         startTopologyViewUpdaterTask(context);
         LOG.info("Discovery service started");
     }
@@ -158,9 +159,9 @@ public class DiscoveryService implements Runnable {
 
             long now = System.currentTimeMillis();
             AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), 
disMsg.getSubAgentName());
-            for (Messages.SubscriberState subStateMsg : 
disMsg.getSubscriberStateList()) {
-                SubscriberConfiguration subConfig = 
disMsg.getSubscriberConfiguration();
-                State subState = new State(subStateMsg.getPubAgentName(), 
subAgentId.getAgentId(), now, subStateMsg.getOffset(), 
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable());
+            for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
+                SubscriberConfig subConfig = 
disMsg.getSubscriberConfiguration();
+                State subState = new State(subStateMsg.getPubAgentName(), 
subAgentId.getAgentId(), now, subStateMsg.getOffset(), 
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
                 viewManager.refreshState(subState);
             }
         }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index d366ab3..6ac1e4b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -52,6 +52,7 @@ import 
org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import 
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -75,8 +76,6 @@ import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 
 import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
@@ -136,7 +135,7 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     private ServiceRegistration<DistributionAgent> componentReg;
 
-    private MessageSender<PackageMessage> sender;
+    private Consumer<PackageMessage> sender;
 
     private JMXRegistration reg;
 
@@ -146,7 +145,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
         REQ_TYPES.put(ADD,    this::sendAndWait);
         REQ_TYPES.put(DELETE, this::sendAndWait);
-        REQ_TYPES.put(TEST,   this::send);
+        REQ_TYPES.put(TEST,   this.sender);
     }
 
     @Activate
@@ -159,7 +158,7 @@ public class DistributionPublisher implements 
DistributionAgent {
 
         pkgType = packageBuilder.getType();
 
-        this.sender = messagingProvider.createSender();
+        this.sender = messagingProvider.createSender(topics.getPackageTopic());
         
         Dictionary<String, Object> props = createServiceProps(config);
         componentReg = 
requireNonNull(context.registerService(DistributionAgent.class, this, props));
@@ -320,7 +319,7 @@ public class DistributionPublisher implements 
DistributionAgent {
             CompletableFuture<Void> received = 
queuedNotifier.registerWait(pkg.getPkgId());
             Event createdEvent = DistributionEvent.eventPackageCreated(pkg, 
pubAgentName);
             eventAdmin.postEvent(createdEvent);
-            send(pkg);
+            sender.accept(pkg);
             received.get(queuedTimeout, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             queuedNotifier.unRegisterWait(pkg.getPkgId());
@@ -328,11 +327,6 @@ public class DistributionPublisher implements 
DistributionAgent {
         }
     }
 
-    private void send(PackageMessage pipePackage) {
-        final String topicName = topics.getPackageTopic();
-        sender.send(topicName, pipePackage);
-    }
-    
     @Nonnull
     private DistributionResponse executeUnsupported(DistributionRequest 
request) {
         String msg = String.format("Request type %s is not supported by this 
agent, expected one of %s",
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 75326e8..c579089 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.publisher;
 
 
 import java.util.Objects;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.LongStream;
 
@@ -31,7 +32,6 @@ import 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import 
org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.commons.lang3.StringUtils;
@@ -65,7 +65,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     @Reference
     private Topics topics;
 
-    private JsonMessageSender<PackageDistributedMessage> sender;
+    private Consumer<PackageDistributedMessage> sender;
 
     private boolean sendMsg;
 
@@ -73,7 +73,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     public void activate() {
         sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
         if (sendMsg) {
-            sender = messagingProvider.createJsonSender();
+            sender = messagingProvider.createSender(topics.getEventTopic());
         }
         LOG.info("Started package distributed notifier with event message 
topic {}", topics.getEventTopic());
     }
@@ -111,7 +111,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
             msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS);
             msg.deepPaths = (String[]) 
queueItem.get(PROPERTY_REQUEST_DEEP_PATHS);
 
-            sender.send(topics.getEventTopic(), msg);
+            sender.accept(msg);
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 2e7d65e..28bf70a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -33,6 +33,9 @@ import org.apache.commons.io.IOUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageMessage.PackageMessageBuilder;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -44,10 +47,6 @@ import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import com.google.protobuf.ByteString;
-
 @Component(service = PackageMessageFactory.class)
 @ParametersAreNonnullByDefault
 public class PackageMessageFactory {
@@ -106,16 +105,16 @@ public class PackageMessageFactory {
         final List<String> paths = Arrays.asList(pkgInfo.getPaths());
         final List<String> deepPaths = 
Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
         final String pkgId = disPkg.getId();
-        PackageMessage.Builder pkgBuilder = PackageMessage.newBuilder()
-                .setPubSlingId(pubSlingId)
-                .setPkgId(pkgId)
-                .setPubAgentName(pubAgentName)
-                .addAllPaths(paths)
-                .setReqType(ReqType.ADD)
-                .addAllDeepPaths(deepPaths)
-                .setPkgLength(pkgLength)
-                .setUserId(resourceResolver.getUserID())
-                .setPkgType(packageBuilder.getType());
+        PackageMessageBuilder pkgBuilder = PackageMessage.builder()
+                .pubSlingId(pubSlingId)
+                .pkgId(pkgId)
+                .pubAgentName(pubAgentName)
+                .paths(paths)
+                .reqType(ReqType.ADD)
+                .deepPaths(deepPaths)
+                .pkgLength(pkgLength)
+                .userId(resourceResolver.getUserID())
+                .pkgType(packageBuilder.getType());
         if (pkgLength > MAX_INLINE_PKG_BINARY_SIZE) {
 
             /*
@@ -131,9 +130,9 @@ public class PackageMessageFactory {
 
             LOG.info("Package {} too large ({}B) to be sent inline", 
disPkg.getId(), pkgLength);
             String pkgBinRef = packageRepo.store(resourceResolver, disPkg);
-            pkgBuilder.setPkgBinaryRef(pkgBinRef);
+            pkgBuilder.pkgBinaryRef(pkgBinRef);
         } else {
-            pkgBuilder.setPkgBinary(ByteString.copyFrom(pkgBinary));
+            pkgBuilder.pkgBinary(pkgBinary);
         }
         PackageMessage pipePackage = pkgBuilder.build();
         disPkg.delete();
@@ -143,27 +142,27 @@ public class PackageMessageFactory {
     @Nonnull
     private PackageMessage createDelete(DistributionPackageBuilder 
packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, 
String pubAgentName) {
         String pkgId = UUID.randomUUID().toString();
-        return PackageMessage.newBuilder()
-                .setPubSlingId(pubSlingId)
-                .setPkgId(pkgId)
-                .setPubAgentName(pubAgentName)
-                .addAllPaths(Arrays.asList(request.getPaths()))
-                .setReqType(ReqType.DELETE)
-                .setPkgType(packageBuilder.getType())
-                .setUserId(resourceResolver.getUserID())
+        return PackageMessage.builder()
+                .pubSlingId(pubSlingId)
+                .pkgId(pkgId)
+                .pubAgentName(pubAgentName)
+                .paths(Arrays.asList(request.getPaths()))
+                .reqType(ReqType.DELETE)
+                .pkgType(packageBuilder.getType())
+                .userId(resourceResolver.getUserID())
                 .build();
     }
 
     @Nonnull
     public PackageMessage createTest(DistributionPackageBuilder 
packageBuilder, ResourceResolver resourceResolver, String pubAgentName) {
         String pkgId = UUID.randomUUID().toString();
-        return PackageMessage.newBuilder()
-                .setPubSlingId(pubSlingId)
-                .setPubAgentName(pubAgentName)
-                .setPkgId(pkgId)
-                .setReqType(ReqType.TEST)
-                .setPkgType(packageBuilder.getType())
-                .setUserId(resourceResolver.getUserID())
+        return PackageMessage.builder()
+                .pubSlingId(pubSlingId)
+                .pubAgentName(pubAgentName)
+                .pkgId(pkgId)
+                .reqType(ReqType.TEST)
+                .pkgType(packageBuilder.getType())
+                .userId(resourceResolver.getUserID())
                 .build();
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
index c9808d8..855a757 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
@@ -33,10 +33,10 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 
 @ParametersAreNonnullByDefault
 public final class QueueItemFactory {
@@ -74,14 +74,14 @@ public final class QueueItemFactory {
         properties.put(RECORD_TIMESTAMP, info.getCreateTime());
         properties.put(PROPERTY_PACKAGE_TYPE, message.getPkgType());
         properties.put(PROPERTY_REQUEST_TYPE, 
toDistReqType(message.getReqType()));
-        String[] paths = toArray(message.getPathsList());
+        String[] paths = toArray(message.getPaths());
         properties.put(PROPERTY_REQUEST_PATHS, paths);
-        String[] deepPaths = toArray(message.getDeepPathsList());
+        String[] deepPaths = toArray(message.getDeepPaths());
         properties.put(PROPERTY_REQUEST_DEEP_PATHS, deepPaths);
         if (addMessage) {
             properties.put(PACKAGE_MSG, message);
         }
-        if (message.hasUserId()) {
+        if (message.getUserId() != null) {
             properties.put(REQUEST_USER_ID, message.getUserId());
         }
         return new DistributionQueueItem(packageId, pkgLength, properties);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index ec811d2..6cf0be7 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,10 +19,10 @@
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
 
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.groupingBy;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.util.HashSet;
@@ -43,6 +43,7 @@ import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
 import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -158,7 +158,8 @@ public class PubQueueCache {
                 this.topic,
                 Reset.earliest,
                 assignTo,
-                create(PackageMessage.class, this::handlePackage));
+                create(PackageMessage.class, this::handlePackage) 
+                );
     }
 
     @Nonnull
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..c9428b6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -18,28 +18,26 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-
-import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.osgi.service.component.annotations.Activate;
@@ -74,7 +72,7 @@ public class PubQueueProviderImpl implements PubQueueProvider 
{
 
     private Closeable statusPoller;
 
-    private MessageSender<CommandMessage> sender;
+    private Consumer<ClearCommand> sender;
 
     public PubQueueProviderImpl() {
     }
@@ -93,8 +91,9 @@ public class PubQueueProviderImpl implements PubQueueProvider 
{
         statusPoller = messagingProvider.createPoller(
                 topics.getStatusTopic(),
                 Reset.earliest,
-                create(PackageStatusMessage.class, this::handleStatus));
-        sender = messagingProvider.createSender();
+                create(PackageStatusMessage.class, this::handleStatus)
+                );
+        sender = messagingProvider.createSender(topics.getCommandTopic());
         LOG.info("Started Publisher queue provider service");
     }
 
@@ -131,7 +130,7 @@ public class PubQueueProviderImpl implements 
PubQueueProvider {
     }
 
     public void handleStatus(MessageInfo info, PackageStatusMessage message) {
-        if (message.getStatus() == REMOVED_FAILED) {
+        if (message.getStatus() == Status.REMOVED_FAILED) {
             String errorQueueKey = errorQueueKey(message.getPubAgentName(), 
message.getSubSlingId(), message.getSubAgentName());
             OffsetQueue<Long> errorQueue = 
errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
             errorQueue.putItem(info.getOffset(), message.getOffset());
@@ -144,16 +143,13 @@ public class PubQueueProviderImpl implements 
PubQueueProvider {
     }
 
     private void sendClearCommand(String subSlingId, String subAgentName, long 
offset) {
-        Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
-                .setOffset(offset)
-                .build();
-        CommandMessage commandMessage = CommandMessage.newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setClearCommand(clearCommand)
+        ClearCommand commandMessage = ClearCommand.builder()
+                .subSlingId(subSlingId)
+                .subAgentName(subAgentName)
+                .offset(offset)
                 .build();
         LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} 
with offset {}.", subSlingId, subAgentName, offset);
-        sender.send(topics.getCommandTopic(), commandMessage);
+        sender.accept(commandMessage);
     }
 
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
index 79f6063..e3ac96c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -27,7 +27,7 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +79,7 @@ public class QueueCacheSeeder implements Closeable {
     private void sendSeedingMessages() {
         LOG.info("Start message seeder");
         try {
-            MessageSender<PackageMessage> sender = provider.createSender();
+            MessageSender<PackageMessage> sender = 
provider.createSender(topic);
             while (!closed) {
                 sendSeedingMessage(sender);
                 delay(CACHE_SEEDING_DELAY_MS);
@@ -90,14 +90,14 @@ public class QueueCacheSeeder implements Closeable {
     }
 
     private void sendSeedingMessage() {
-        sendSeedingMessage(provider.createSender());
+        sendSeedingMessage(provider.createSender(topic));
     }
 
     private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
         PackageMessage pkgMsg = createTestMessage();
         LOG.info("Send seeding message");
         try {
-            sender.send(topic, pkgMsg);
+            sender.send(pkgMsg);
         } catch (MessagingException e) {
             LOG.warn(e.getMessage(), e);
             delay(CACHE_SEEDING_DELAY_MS * 10);
@@ -114,11 +114,11 @@ public class QueueCacheSeeder implements Closeable {
 
     protected PackageMessage createTestMessage() {
         String pkgId = UUID.randomUUID().toString();
-        return PackageMessage.newBuilder()
-                .setPubSlingId("seeder")
-                .setPkgId(pkgId)
-                .setPkgType("seeder")
-                .setReqType(PackageMessage.ReqType.TEST)
+        return PackageMessage.builder()
+                .pubSlingId("seeder")
+                .pkgId(pkgId)
+                .pkgType("seeder")
+                .reqType(PackageMessage.ReqType.TEST)
                 .build();
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 68fe9f7..72e7c4a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -31,12 +31,11 @@ import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 
 @ParametersAreNonnullByDefault
 public class RangePoller {
@@ -64,7 +63,8 @@ public class RangePoller {
         LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
         headPoller = messagingProvider.createPoller(
                 packageTopic, Reset.earliest, assign,
-                create(Messages.PackageMessage.class, this::handlePackage));
+                create(PackageMessage.class, this::handlePackage)
+                );
     }
 
     public List<FullMessage<PackageMessage>> fetchRange() throws 
InterruptedException {
@@ -77,7 +77,7 @@ public class RangePoller {
         }
     }
 
-    private void handlePackage(MessageInfo info, Messages.PackageMessage 
message) {
+    private void handlePackage(MessageInfo info, PackageMessage message) {
         long offset = info.getOffset();
         LOG.debug("Reading offset {}", offset);
         if (offset < maxOffset) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
index 2bb30b5..687c9ab 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
@@ -36,8 +36,7 @@ import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +65,8 @@ public class LimitPoller {
         log.info("Fetching {} messages starting from {}", maxMessages, 
minOffset);
         headPoller = messagingProvider.createPoller(
                 packageTopic, Reset.earliest, assign,
-                create(Messages.PackageMessage.class, this::handlePackage));
+                create(PackageMessage.class, this::handlePackage)
+                );
     }
 
     public List<FullMessage<PackageMessage>> fetch(Duration timeOut) {
@@ -86,7 +86,7 @@ public class LimitPoller {
         }
     }
 
-    private void handlePackage(MessageInfo info, Messages.PackageMessage 
message) {
+    private void handlePackage(MessageInfo info, PackageMessage message) {
         long offset = info.getOffset();
         log.debug("Reading offset {}", offset);
         if (this.messages.size() < maxMessages && info.getOffset() >= 
minOffset) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index b9d1d2d..b02d33a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -43,7 +43,7 @@ import 
org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 
@@ -78,8 +78,8 @@ public class PackageBrowser {
     
     @Nonnull
     public static InputStream pkgStream(ResourceResolver resolver, 
PackageMessage pkgMsg) throws DistributionException {
-        if (pkgMsg.hasPkgBinary()) {
-            return new 
ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+        if (pkgMsg.getPkgBinary() != null) {
+            return new ByteArrayInputStream(pkgMsg.getPkgBinary());
         } else {
             String pkgBinRef = pkgMsg.getPkgBinaryRef();
             try {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index 30f8836..308cd61 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -31,8 +31,8 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.felix.webconsole.AbstractWebConsolePlugin;
 import org.apache.felix.webconsole.WebConsoleConstants;
 import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.osgi.framework.Constants;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
@@ -109,7 +109,7 @@ public class PackageViewerPlugin extends 
AbstractWebConsolePlugin {
                 msg.getMessage().getPkgId(),
                 msg.getInfo().getOffset(),
                 msg.getMessage().getReqType(),
-                msg.getMessage().getPathsList().toString());
+                msg.getMessage().getPaths().toString());
     }
 
     private void writePackage(Long offset, HttpServletResponse res) throws 
IOException {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..06059e2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -19,19 +19,20 @@
 package org.apache.sling.distribution.journal.impl.subscriber;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.messages.Messages;
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,27 +89,28 @@ class Announcer implements Runnable, Closeable {
 
     private DiscoveryMessage createDiscoveryMessage() {
         long offset = bookKeeper.loadOffset();
-        SubscriberConfiguration subscriberConfiguration = 
SubscriberConfiguration.newBuilder()
-                .setEditable(editable)
-                .setMaxRetries(maxRetries)
+        SubscriberConfig subscriberConfiguration = SubscriberConfig.builder()
+                .editable(editable)
+                .maxRetries(maxRetries)
+                .build();
+        List<SubscriberState> states = pubAgentNames.stream()
+            .map(pubAgentName -> subscriberState(pubAgentName, offset))
+            .collect(Collectors.toList());
+        return DiscoveryMessage
+                .builder()
+                .subSlingId(subSlingId)
+                .subAgentName(subAgentName)
+                .subscriberConfiguration(subscriberConfiguration)
+                .subscriberStates(states)
                 .build();
-        DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
-                .newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setSubscriberConfiguration(subscriberConfiguration);
-        for (String pubAgentName : pubAgentNames) {
-            disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, 
offset));
-        }
-        return disMsgBuilder.build();
     }
 
     private SubscriberState subscriberState(String pubAgentName, long offset) {
         int retries = bookKeeper.getRetries(pubAgentName);
-        return Messages.SubscriberState.newBuilder()
-                .setPubAgentName(pubAgentName)
-                .setRetries(retries)
-                .setOffset(offset)
+        return SubscriberState.builder()
+                .pubAgentName(pubAgentName)
+                .retries(retries)
+                .offset(offset)
                 .build();
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 2984399..59d9687 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -22,9 +22,6 @@ import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -45,9 +42,9 @@ import 
org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -151,7 +148,7 @@ public class BookKeeper implements Closeable {
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (editable) {
-                storeStatus(importerResolver, new PackageStatus(IMPORTED, 
offset, pkgMsg.getPubAgentName()));
+                storeStatus(importerResolver, new 
PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, 
pkgMsg.getPubAgentName()));
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
@@ -170,7 +167,7 @@ public class BookKeeper implements Closeable {
     private void addPackageMDC(PackageMessage pkgMsg) {
         MDC.put("module", "distribution");
         MDC.put("package-id", pkgMsg.getPkgId());
-        String paths = String.join(",", pkgMsg.getPathsList());
+        String paths = String.join(",", pkgMsg.getPaths());
         MDC.put("paths", paths);
         MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
         String pubAgentName = pkgMsg.getPubAgentName();
@@ -212,7 +209,7 @@ public class BookKeeper implements Closeable {
         Timer.Context context = 
distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
-                storeStatus(resolver, new PackageStatus(REMOVED, offset, 
pkgMsg.getPubAgentName()));
+                storeStatus(resolver, new PackageStatus(Status.REMOVED, 
offset, pkgMsg.getPubAgentName()));
             }
             storeOffset(resolver, offset);
             resolver.commit();
@@ -263,12 +260,12 @@ public class BookKeeper implements Closeable {
     }
     
     private void sendStatusMessage(PackageStatus status) {
-        PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setPubAgentName(status.pubAgentName)
-                .setOffset(status.offset)
-                .setStatus(status.status)
+        PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder()
+                .subSlingId(subSlingId)
+                .subAgentName(subAgentName)
+                .pubAgentName(status.pubAgentName)
+                .offset(status.offset)
+                .status(status.status)
                 .build();
         sender.accept(pkgStatMsg);
         log.info("Sent status message {}",  pkgStatMsg);
@@ -305,7 +302,7 @@ public class BookKeeper implements Closeable {
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = 
distributionMetricsService.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
-            storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, 
pkgMsg.getPubAgentName()));
+            storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED, 
offset, pkgMsg.getPubAgentName()));
             storeOffset(resolver, offset);
             resolver.commit();
         } catch (Exception e) {
@@ -351,7 +348,7 @@ public class BookKeeper implements Closeable {
         
         PackageStatus(ValueMap statusMap) {
             Integer statusNum = statusMap.get("statusNumber", Integer.class);
-            this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
+            this.status = statusNum !=null ? Status.fromNumber(statusNum) : 
null;
             this.offset = statusMap.get(KEY_OFFSET, Long.class);
             this.pubAgentName = statusMap.get("pubAgentName", String.class);
             this.sent = statusMap.get("sent", true);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..703a9d1 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -28,7 +28,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,8 @@ public class CommandPoller implements Closeable {
             poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
-                    create(CommandMessage.class, this::handleCommandMessage));
+                    create(ClearCommand.class, this::handleCommandMessage)
+                    );
         } else {
             poller = null;
         }
@@ -67,17 +68,13 @@ public class CommandPoller implements Closeable {
         return offset <= clearOffset.longValue();
     }
 
-    private void handleCommandMessage(MessageInfo info, CommandMessage 
message) {
+    private void handleCommandMessage(MessageInfo info, ClearCommand message) {
         if (!subSlingId.equals(message.getSubSlingId()) || 
!subAgentName.equals(message.getSubAgentName())) {
             LOG.debug("Skip command for subSlingId {}", 
message.getSubSlingId());
             return;
         }
 
-        if (message.hasClearCommand()) {
-            handleClearCommand(message.getClearCommand().getOffset());
-        } else {
-            LOG.warn("Unsupported command {}", message);
-        }
+        handleClearCommand(message.getOffset());
     }
 
     private void handleClearCommand(long offset) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8bde940..dcd016c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -22,7 +22,6 @@ import static java.lang.String.format;
 import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 import static 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
 import static 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
@@ -64,9 +63,9 @@ import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
@@ -76,7 +75,8 @@ import 
org.apache.sling.distribution.journal.impl.shared.AgentState;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import 
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -93,8 +93,6 @@ import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.GeneratedMessage;
-
 /**
  * A Subscriber SCD agent which consumes messages produced by a
  * {@code DistributionPublisher} agent.
@@ -201,14 +199,15 @@ public class DistributionSubscriber implements 
DistributionAgent {
 
         ContentPackageExtractor extractor = new 
ContentPackageExtractor(packaging, config.packageHandling());
         PackageHandler packageHandler = new PackageHandler(packageBuilder, 
extractor);
+        Consumer<PackageStatusMessage> sender = 
messagingProvider.createSender(topics.getStatusTopic());
         bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packageHandler, eventAdmin,
-                sender(topics.getStatusTopic()), subAgentName, subSlingId, 
editable, maxRetries);
+                sender, subAgentName, subSlingId, editable, maxRetries);
         
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
 
         packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
-                create(PackageMessage.class, this::handlePackageMessage));
+                HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage));
 
         commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, editable);
 
@@ -216,7 +215,7 @@ public class DistributionSubscriber implements 
DistributionAgent {
                 format("Queue Processor for Subscriber agent %s", 
subAgentName));
 
         int announceDelay = 
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
-        announcer = new Announcer(subSlingId, subAgentName, queueNames, 
sender(topics.getDiscoveryTopic()), bookKeeper,
+        announcer = new Announcer(subSlingId, subAgentName, queueNames, 
messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
                 maxRetries, config.editable(), announceDelay);
 
         boolean errorQueueEnabled = (maxRetries >= 0);
@@ -228,11 +227,6 @@ public class DistributionSubscriber implements 
DistributionAgent {
         componentReg = context.registerService(DistributionAgent.class, this, 
props);
     }
 
-    private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
-        MessageSender<T> sender = messagingProvider.createSender();
-        return msg -> sender.send(topic, msg);
-    }
-
     private Set<String> getNotEmpty(String[] agentNames) {
         return 
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index bdfb3dd..bccf474 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -28,7 +28,7 @@ import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,12 +63,12 @@ public class PackageHandler {
 
     private void installAddPackage(ResourceResolver resolver, PackageMessage 
pkgMsg)
             throws DistributionException {
-        LOG.info("Importing paths {}",pkgMsg.getPathsList());
+        LOG.info("Importing paths {}",pkgMsg.getPaths());
         InputStream pkgStream = null;
         try {
             pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
             packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPathsList());
+            extractor.handle(resolver, pkgMsg.getPaths());
         } finally {
             IOUtils.closeQuietly(pkgStream);
         }
@@ -77,8 +77,8 @@ public class PackageHandler {
 
     private void installDeletePackage(ResourceResolver resolver, 
PackageMessage pkgMsg)
             throws PersistenceException {
-        LOG.info("Deleting paths {}",pkgMsg.getPathsList());
-        for (String path : pkgMsg.getPathsList()) {
+        LOG.info("Deleting paths {}",pkgMsg.getPaths());
+        for (String path : pkgMsg.getPaths()) {
             Resource resource = resolver.getResource(path);
             if (resource != null) {
                 resolver.delete(resource);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 1c5350e..f882f32 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -18,11 +18,10 @@
  */
 package org.apache.sling.distribution.journal.impl.precondition;
 
-import 
org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -96,12 +95,12 @@ public class PackageStatusWatcherTest {
     }
 
     PackageStatusMessage createStatusMessage(int i) {
-        return PackageStatusMessage.newBuilder()
-                .setSubSlingId(SUB1_SLING_ID)
-                .setSubAgentName(SUB1_AGENT_NAME)
-                .setPubAgentName(PUB1_AGENT_NAME)
-                .setOffset(1000 + i)
-                .setStatus(PackageStatusMessage.Status.REMOVED_FAILED)
+        return PackageStatusMessage.builder()
+                .subSlingId(SUB1_SLING_ID)
+                .subAgentName(SUB1_AGENT_NAME)
+                .pubAgentName(PUB1_AGENT_NAME)
+                .offset(1000 + i)
+                .status(PackageStatusMessage.Status.REMOVED_FAILED)
                 .build();
 
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 59b1c14..3a31224 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -37,7 +37,7 @@ import 
org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -143,12 +143,12 @@ public class StagingPreconditionTest {
     }
 
     private void simulateMessage(String subAgentName, long pkgOffset, 
PackageStatusMessage.Status status) {
-        PackageStatusMessage message = PackageStatusMessage.newBuilder()
-                .setSubSlingId(SUB1_SLING_ID)
-                .setSubAgentName(subAgentName)
-                .setPubAgentName(PUB1_AGENT_NAME)
-                .setOffset(pkgOffset)
-                .setStatus(status)
+        PackageStatusMessage message = PackageStatusMessage.builder()
+                .subSlingId(SUB1_SLING_ID)
+                .subAgentName(subAgentName)
+                .pubAgentName(PUB1_AGENT_NAME)
+                .offset(pkgOffset)
+                .status(status)
                 .build();
         
         TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index d39f280..0adf713 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -36,9 +36,9 @@ import 
org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -123,20 +123,20 @@ public class DiscoveryServiceTest {
     }
 
     private DiscoveryMessage discoveryMessage(String subSlingId, String 
subAgentName, SubscriberState... subStates) {
-        return DiscoveryMessage.newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
+        return DiscoveryMessage.builder()
+                .subSlingId(subSlingId)
+                .subAgentName(subAgentName)
+                .subscriberConfiguration(SubscriberConfig
+                        .builder()
+                        .editable(false)
+                        .maxRetries(-1)
                         .build())
-                .addAllSubscriberState(Arrays.asList(subStates)).build();
+                .subscriberStates(Arrays.asList(subStates)).build();
     }
 
     private SubscriberState subscriberState(String pubAgentName, int offset) {
-        return SubscriberState.newBuilder()
-                .setPubAgentName(pubAgentName)
-                .setOffset(offset).build();
+        return SubscriberState.builder()
+                .pubAgentName(pubAgentName)
+                .offset(offset).build();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
index 125aa5f..929de44 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
@@ -30,12 +31,14 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.SimpleDistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.junit.Before;
@@ -66,7 +69,7 @@ public class DistributionPackageFactoryTest {
     public void testAdd() throws DistributionException, IOException {
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
 
-        org.apache.sling.distribution.packaging.DistributionPackage pkg = 
mock(org.apache.sling.distribution.packaging.DistributionPackage.class);
+        DistributionPackage pkg = mock(DistributionPackage.class);
         when(pkg.createInputStream()).thenReturn(new ByteArrayInputStream(new 
byte[] {}));
         when(pkg.getId()).thenReturn("myid");
         Map<String, Object> props = new HashMap<>();
@@ -77,25 +80,25 @@ public class DistributionPackageFactoryTest {
         when(pkg.getInfo()).thenReturn(info);
         when(packageBuilder.createPackage(Mockito.eq(resourceResolver), 
Mockito.eq(request))).thenReturn(pkg);
 
-        Messages.PackageMessage sent = publisher.create(packageBuilder, 
resourceResolver, "pub1agent1", request);
+        PackageMessage sent = publisher.create(packageBuilder, 
resourceResolver, "pub1agent1", request);
         
         assertThat(sent.getPkgBinary(), notNullValue());
         assertThat(sent.getPkgLength(), equalTo(0L));
-        assertThat(sent.getReqType(), 
equalTo(Messages.PackageMessage.ReqType.ADD));
+        assertThat(sent.getReqType(), equalTo(ReqType.ADD));
         assertThat(sent.getPkgType(), equalTo("journal"));
-        assertThat(sent.getPathsList(), contains("/test"));
-        assertThat(sent.getDeepPathsList(), contains("/test2"));
+        assertThat(sent.getPaths(), contains("/test"));
+        assertThat(sent.getDeepPaths(), contains("/test2"));
     }
     
     @Test
     public void testDelete() throws DistributionException, IOException {
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.DELETE, "/test");
 
-        Messages.PackageMessage sent = publisher.create(packageBuilder, 
resourceResolver, "pub1agent1", request);
-        assertThat(sent.getReqType(), 
equalTo(Messages.PackageMessage.ReqType.DELETE));
-        assertThat(sent.getPkgBinary(), notNullValue());
+        PackageMessage sent = publisher.create(packageBuilder, 
resourceResolver, "pub1agent1", request);
+        assertThat(sent.getReqType(), equalTo(ReqType.DELETE));
+        assertThat(sent.getPkgBinary(), nullValue());
         assertThat(sent.getPkgLength(), equalTo(0L));
-        assertThat(sent.getPathsList(), contains("/test"));
+        assertThat(sent.getPaths(), contains("/test"));
         
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..32a8a11 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
@@ -131,7 +130,7 @@ public class DistributionPublisherTest {
     private ResourceResolver resourceResolver;
 
     @Mock
-    private MessageSender<Messages.PackageMessage> sender;
+    private MessageSender<PackageMessage> sender;
     
     @Mock
     private PackageQueuedNotifier queuedNotifier;
@@ -140,7 +139,7 @@ public class DistributionPublisherTest {
     private TopologyView topology;
     
     @Captor
-    private ArgumentCaptor<Messages.PackageMessage> pkgCaptor;
+    private ArgumentCaptor<PackageMessage> pkgCaptor;
 
     @Spy
     private Topics topics = new Topics();
@@ -155,7 +154,7 @@ public class DistributionPublisherTest {
         when(slingSettings.getSlingId()).thenReturn("pub1sling");
         when(context.registerService(Mockito.eq(DistributionAgent.class), 
Mockito.eq(publisher),
                 Mockito.any(Dictionary.class))).thenReturn(serviceReg);
-        
when(messagingProvider.<Messages.PackageMessage>createSender()).thenReturn(sender);
+        
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher.activate(config, context);
         when(timer.time()).thenReturn(timerContext);
     }
@@ -171,7 +170,7 @@ public class DistributionPublisherTest {
     public void testSend() throws DistributionException, IOException {
         DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
 
-        Messages.PackageMessage pkg = mockPackage(request);
+        PackageMessage pkg = mockPackage(request);
         
when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver),
 anyString(), Mockito.eq(request))).thenReturn(pkg);
         CompletableFuture<Void> callback = 
CompletableFuture.completedFuture(null);
         
when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
@@ -184,8 +183,8 @@ public class DistributionPublisherTest {
         DistributionResponse response = publisher.execute(resourceResolver, 
request);
         
         assertThat(response.getState(), 
equalTo(DistributionRequestState.ACCEPTED));
-        verify(sender).send(Mockito.eq(topics.getPackageTopic()), 
pkgCaptor.capture());
-        Messages.PackageMessage sent = pkgCaptor.getValue();
+        verify(sender).accept(pkgCaptor.capture());
+        PackageMessage sent = pkgCaptor.getValue();
         // Individual fields are checks in factory
         assertThat(sent, notNullValue());
         
@@ -252,15 +251,15 @@ public class DistributionPublisherTest {
         return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false);
     }
 
-    private Messages.PackageMessage mockPackage(DistributionRequest request) 
throws IOException {
-        return Messages.PackageMessage.newBuilder()
-                .setPkgId("myid")
-                .setPubSlingId("pub1sling")
-                .setPkgType("journal")
-                .setReqType(Messages.PackageMessage.ReqType.ADD)
-                .addAllPaths(Arrays.asList(request.getPaths()))
-                .addDeepPaths("/test2")
-                .setPkgBinary(ByteString.copyFrom(new byte[100]))
+    private PackageMessage mockPackage(DistributionRequest request) throws 
IOException {
+        return PackageMessage.builder()
+                .pkgId("myid")
+                .pubSlingId("pub1sling")
+                .pkgType("journal")
+                .reqType(PackageMessage.ReqType.ADD)
+                .paths(Arrays.asList(request.getPaths()))
+                .deepPaths(Arrays.asList("/test2"))
+                .pkgBinary(new byte[100])
                 .build();
     }
     
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 2444d21..6bf412f 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -28,8 +29,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+
 import static 
org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED;
 
 public class PackageQueuedNotifierTest {
@@ -66,12 +68,12 @@ public class PackageQueuedNotifierTest {
     }
 
     private PackageMessage pkgMsg(String packageId) {
-        return PackageMessage.newBuilder()
-            .addPaths("/test")
-            .setPkgId(packageId)
-            .setReqType(ReqType.ADD)
-            .setPkgType("journal")
-            .setPubSlingId("sling1")
+        return PackageMessage.builder()
+            .paths(Arrays.asList("/test"))
+            .pkgId(packageId)
+            .reqType(ReqType.ADD)
+            .pkgType("journal")
+            .pubSlingId("sling1")
             .build();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index 43ed4f7..8ec5aae 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -39,7 +39,7 @@ import 
org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.sling.MockSling;
@@ -79,7 +79,7 @@ public class PackageRepoTest {
     private DistributionMetricsService distributionMetricsService;
 
     @Captor
-    private ArgumentCaptor<Messages.PackageMessage> pkgCaptor;
+    private ArgumentCaptor<PackageMessage> pkgCaptor;
 
     @Spy
     private Topics topics = new Topics();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 1bf6fd8..8097105 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -30,14 +30,16 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.hasItemInArray;
 import static org.junit.Assert.assertThat;
 
+import java.util.Arrays;
+
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class QueueItemFactoryTest {
@@ -47,13 +49,13 @@ public class QueueItemFactoryTest {
        @Test
        public void test() {
                MessageInfo info = new TestMessageInfo("topic", 0, 1, 2);
-               PackageMessage message = PackageMessage.newBuilder()
-                               .setPubSlingId("sling1")
-                               .setPkgId("pkg1")
-                               .setPkgType("type")
-                               .addPaths("/")
-                               .addDeepPaths("/deep")
-                               .setReqType(ReqType.ADD)
+               PackageMessage message = PackageMessage.builder()
+                               .pubSlingId("sling1")
+                               .pkgId("pkg1")
+                               .pkgType("type")
+                               .paths(Arrays.asList("/"))
+                               .deepPaths(Arrays.asList("/deep"))
+                               .reqType(ReqType.ADD)
                                .build();
                
                item = QueueItemFactory.fromPackage(info, message, true);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 3f0dc5e..e7f2444 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -20,9 +20,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.ADD;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
@@ -52,8 +49,8 @@ import 
org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
@@ -157,7 +154,7 @@ public class PubQueueCacheTest {
 
     @Test
     public void testSeedingFromNewPackageMessage() throws Exception {
-        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 0);
         OffsetQueue<DistributionQueueItem> queue = 
cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
         assertThat(queue.getSize(), greaterThan(0));
     }
@@ -204,12 +201,12 @@ public class PubQueueCacheTest {
 
     @Test
     public void testCacheSize() throws Exception {
-        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
-        simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
-        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
-        simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3);    // TEST 
message does not increase the cache size
-        simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4);    // TEST 
message does not increase the cache size
-        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
+        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 0);
+        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.DELETE, 1);
+        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ReqType.ADD, 2);
+        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.TEST, 3);    // 
TEST message does not increase the cache size
+        simulateMessage(tailHandler, PUB_AGENT_NAME_2, ReqType.TEST, 4);    // 
TEST message does not increase the cache size
+        simulateMessage(tailHandler, PUB_AGENT_NAME_3, ReqType.ADD, 5);
         assertEquals(4, cache.size());
     }
 
@@ -220,16 +217,16 @@ public class PubQueueCacheTest {
     private void simulateMessage(MessageHandler<PackageMessage> handler, long 
offset) {
         simulateMessage(handler,
                 pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3),
-                pickAny(ADD, DELETE, TEST), offset);
+                pickAny(ReqType.ADD, ReqType.DELETE, ReqType.TEST), offset);
     }
 
     private void simulateMessage(MessageHandler<PackageMessage> handler, 
String pubAgentName, ReqType reqType, long offset) {
-        PackageMessage msg = PackageMessage.newBuilder()
-                .setPkgType("pkgType")
-                .setPkgId(UUID.randomUUID().toString())
-                .setPubSlingId("pubSlingId")
-                .setReqType(reqType)
-                .setPubAgentName(pubAgentName)
+        PackageMessage msg = PackageMessage.builder()
+                .pkgType("pkgType")
+                .pkgId(UUID.randomUUID().toString())
+                .pubSlingId("pubSlingId")
+                .reqType(reqType)
+                .pubAgentName(pubAgentName)
                 .build();
         simulateMessage(handler, msg, offset);
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 009235c..2435c3a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
@@ -41,9 +42,11 @@ import javax.management.ReflectionException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.MessageSender;
-import com.google.protobuf.GeneratedMessage;
-
 import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -59,16 +62,11 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 
 import org.osgi.service.event.EventAdmin;
 
@@ -88,7 +86,7 @@ public class PubQueueProviderTest {
     private SlingSettingsService slingSettings;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> 
handlerCaptor;
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> 
statHandlerCaptor;
@@ -103,13 +101,13 @@ public class PubQueueProviderTest {
     private EventAdmin eventAdmin;
 
     @Mock
-    private MessageSender<GeneratedMessage> sender;
+    private MessageSender<Object> sender;
 
     private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
 
     private PubQueueCacheService pubQueueCacheService;
 
-    private MessageHandler<Messages.PackageMessage> handler;
+    private MessageHandler<PackageMessage> handler;
     private MessageHandler<PackageStatusMessage> statHandler;
 
     private PubQueueProviderImpl queueProvider;
@@ -129,7 +127,7 @@ public class PubQueueProviderTest {
                 Mockito.any(Reset.class),
                 statHandlerCaptor.capture()))
         .thenReturn(statPoller);
-        when(clientProvider.createSender())
+        when(clientProvider.createSender(Mockito.anyString()))
         .thenReturn(sender);
         Topics topics = new Topics();
         String slingId = UUID.randomUUID().toString();
@@ -209,12 +207,12 @@ public class PubQueueProviderTest {
     }
 
     private PackageStatusMessage statusMessage(long offset, PackageMessage 
pkgMsg1) {
-        return PackageStatusMessage.newBuilder()
-            .setOffset(offset)
-            .setPubAgentName(PUB1_AGENT_NAME)
-            .setStatus(Status.REMOVED_FAILED)
-            .setSubAgentName(SUB_AGENT_NAME)
-            .setSubSlingId(SUB_SLING_ID)
+        return PackageStatusMessage.builder()
+            .offset(offset)
+            .pubAgentName(PUB1_AGENT_NAME)
+            .status(Status.REMOVED_FAILED)
+            .subAgentName(SUB_AGENT_NAME)
+            .subSlingId(SUB_SLING_ID)
             .build();
     }
 
@@ -224,13 +222,13 @@ public class PubQueueProviderTest {
     }
 
     private PackageMessage packageMessage(String packageId, String 
pubAgentName) {
-        return Messages.PackageMessage.newBuilder()
-                .setPubAgentName(pubAgentName)
-                .setPubSlingId("pub1SlingId")
-                .setPkgId(packageId)
-                .setReqType(ReqType.ADD)
-                .setPkgType("journal")
-                .addPaths("path")
+        return PackageMessage.builder()
+                .pubAgentName(pubAgentName)
+                .pubSlingId("pub1SlingId")
+                .pkgId(packageId)
+                .reqType(ReqType.ADD)
+                .pkgType("journal")
+                .paths(Arrays.asList("path"))
                 .build();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
index eeebb7a..69b42ca 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -27,7 +27,8 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +41,6 @@ import org.mockito.runners.MockitoJUnitRunner;
 
 import static java.lang.System.currentTimeMillis;
 import static 
org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC;
-import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
@@ -81,8 +81,8 @@ public class QueueCacheSeederTest {
                 any(Reset.class),
                 pkgHandlerCaptor.capture()))
                 .thenReturn(poller);
-        doNothing().when(sender).send(eq(PACKAGE_TOPIC), 
pkgMsgCaptor.capture());
-        when(clientProvider.<PackageMessage>createSender())
+        doNothing().when(sender).send(pkgMsgCaptor.capture());
+        when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC)))
                 .thenReturn(sender);
         seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
     }
@@ -99,10 +99,10 @@ public class QueueCacheSeederTest {
     @Test
     public void testSendingSeeds() {
         seeder.seed(callback);
-        verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), 
pkgMsgCaptor.capture());
+        verify(sender, 
timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture());
         PackageMessage seedMsg = pkgMsgCaptor.getValue();
         assertNotNull(seedMsg);
-        assertEquals(TEST, seedMsg.getReqType());
+        assertEquals(ReqType.TEST, seedMsg.getReqType());
     }
 
     @After
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 9a597f8..a1d66fb 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sling.distribution.journal.FullMessage;
@@ -36,9 +37,8 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +58,7 @@ public class RangePollerTest {
     private MessagingProvider clientProvider;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> 
handlerCaptor;
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
     
     @Mock
     private Closeable poller;
@@ -107,15 +107,15 @@ public class RangePollerTest {
         }
     }
 
-    private FullMessage<Messages.PackageMessage> createMessage(ReqType 
reqType, int offset) {
+    private FullMessage<PackageMessage> createMessage(ReqType reqType, int 
offset) {
         MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, 
System.currentTimeMillis());
-        PackageMessage message = Messages.PackageMessage.newBuilder()
-                .setPubAgentName("agent1")
-                .setPubSlingId("pub1SlingId")
-                .setPkgId("package-" + offset)
-                .setReqType(reqType)
-                .setPkgType("journal")
-                .addPaths("path")
+        PackageMessage message = PackageMessage.builder()
+                .pubAgentName("agent1")
+                .pubSlingId("pub1SlingId")
+                .pkgId("package-" + offset)
+                .reqType(reqType)
+                .pkgType("journal")
+                .paths(Arrays.asList("path"))
                 .build();
         return new FullMessage<>(info, message);
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
index 5546b50..8f7c262 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
@@ -27,9 +27,11 @@ import static org.mockito.Mockito.when;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
 
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,9 +41,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
@@ -60,7 +59,7 @@ public class LimitPollerTest {
     private MessagingProvider clientProvider;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<Messages.PackageMessage>> 
handlerCaptor;
+    private 
ArgumentCaptor<HandlerAdapter<org.apache.sling.distribution.journal.messages.PackageMessage>>
 handlerCaptor;
     
     @Mock
     private Closeable poller;
@@ -112,15 +111,15 @@ public class LimitPollerTest {
         return message;
     }
 
-    private FullMessage<Messages.PackageMessage> createMessage(int offset) {
+    private FullMessage<PackageMessage> createMessage(int offset) {
         MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, 
System.currentTimeMillis());
-        PackageMessage message = Messages.PackageMessage.newBuilder()
-                .setPubAgentName("agent1")
-                .setPubSlingId("pub1SlingId")
-                .setPkgId("package-" + offset)
-                .setReqType(ReqType.ADD)
-                .setPkgType("journal")
-                .addPaths("path")
+        PackageMessage message = PackageMessage.builder()
+                .pubAgentName("agent1")
+                .pubSlingId("pub1SlingId")
+                .pkgId("package-" + offset)
+                .reqType(ReqType.ADD)
+                .pkgType("journal")
+                .paths(Arrays.asList("path"))
                 .build();
         return new FullMessage<>(info, message);
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
index 659f69e..c19f633 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 
 import javax.jcr.Binary;
 import javax.jcr.Session;
@@ -34,8 +35,8 @@ import javax.jcr.ValueFactory;
 import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
@@ -85,10 +86,12 @@ public class PackageBrowserTest {
     }
 
     private PackageMessage createPackageMsg(long offset) throws Exception {
-        return 
PackageMessage.newBuilder().setPubSlingId("").setReqType(ReqType.ADD)
-                .addPaths("/content")
-                .setPkgId("pkgid")
-                .setPkgType("some_type")
-                .setPkgBinaryRef("myref").build();
+        return PackageMessage.builder()
+                .pubSlingId("")
+                .reqType(ReqType.ADD)
+                .paths(Arrays.asList("/content"))
+                .pkgId("pkgid")
+                .pkgType("some_type")
+                .pkgBinaryRef("myref").build();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index 1a46360..36009ba 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -41,8 +42,8 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -52,8 +53,6 @@ import org.mockito.Mockito;
 import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import com.google.protobuf.ByteString;
-
 @RunWith(MockitoJUnitRunner.class)
 public class PackageViewerPluginTest {
 
@@ -147,13 +146,13 @@ public class PackageViewerPluginTest {
 
     private FullMessage<PackageMessage> createPackageMsg(long offset) {
         MessageInfo info = new TestMessageInfo("topic", 0 , offset, 0L);
-        PackageMessage message = PackageMessage.newBuilder()
-                .setPubSlingId("")
-                .setReqType(ReqType.ADD)
-                .addPaths("/content")
-                .setPkgId("pkgid")
-                .setPkgType("some_type")
-                .setPkgBinary(ByteString.copyFrom("package content", 
Charset.defaultCharset()))
+        PackageMessage message = PackageMessage.builder()
+                .pubSlingId("")
+                .reqType(ReqType.ADD)
+                .paths(Arrays.asList("/content"))
+                .pkgId("pkgid")
+                .pkgType("some_type")
+                .pkgBinary("package 
content".getBytes(Charset.defaultCharset()))
                 .build();
         return new FullMessage<>(info, message);
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
index a2105e3..a5112f9 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
+import java.util.Map;
+
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class TestMessageInfo implements MessageInfo {
@@ -50,4 +52,9 @@ public class TestMessageInfo implements MessageInfo {
         return createTime;
     }
 
+    @Override
+    public Map<String, String> getProps() {
+        return null;
+    }
+
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 5746dc7..e380f5e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,8 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 import java.util.function.Consumer;
 
-import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -41,15 +42,15 @@ public class AnnouncerTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testDiscoveryMessage() throws InterruptedException {
-        Consumer<Messages.DiscoveryMessage> sender = 
Mockito.mock(Consumer.class);
+        Consumer<DiscoveryMessage> sender = Mockito.mock(Consumer.class);
         BookKeeper bookKeeper = Mockito.mock(BookKeeper.class);
         when(bookKeeper.loadOffset()).thenReturn(1L);
         Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, 
Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000);
         Thread.sleep(200);
-        ArgumentCaptor<Messages.DiscoveryMessage> msg = 
forClass(Messages.DiscoveryMessage.class);
+        ArgumentCaptor<DiscoveryMessage> msg = 
forClass(DiscoveryMessage.class);
         verify(sender).accept(msg.capture());
-        Messages.DiscoveryMessage message = msg.getValue();
-        Messages.SubscriberState offset = 
message.getSubscriberStateList().iterator().next();
+        DiscoveryMessage message = msg.getValue();
+        SubscriberState offset = 
message.getSubscriberStates().iterator().next();
         assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID));
         assertThat(offset.getPubAgentName(), equalTo(PUB1_AGENT_NAME));
         assertThat(offset.getOffset(), equalTo(1L));
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 1345895..3afa938 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -27,7 +27,7 @@ import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 72d0257..9cc3503 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -35,8 +35,7 @@ import 
org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -66,9 +65,9 @@ public class CommandPollerTest {
     CommandPoller commandPoller;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<CommandMessage>> handlerCaptor;
+    private ArgumentCaptor<HandlerAdapter<ClearCommand>> handlerCaptor;
     
-    private MessageHandler<CommandMessage> commandHandler;
+    private MessageHandler<ClearCommand> commandHandler;
 
     private Topics topics = new Topics();
 
@@ -120,15 +119,6 @@ public class CommandPollerTest {
     }
 
     @Test
-    public void testIgnoreInvalidCommand() throws DistributionException, 
InterruptedException, IOException {
-        createCommandPoller(true);
-        
-        CommandMessage message = 
CommandMessage.newBuilder(commandMessage(10L)).clearClearCommand().build();
-        commandHandler.handle(info, message);
-        assertClearedUpTo(-1);
-    }
-    
-    @Test
     public void testEditable() throws DistributionException, 
InterruptedException, IOException {
         createCommandPoller(true);
         
@@ -150,18 +140,15 @@ public class CommandPollerTest {
         assertThat(commandPoller.isCleared(1), equalTo(false));
     }
 
-    private CommandMessage commandMessage(long offset) {
+    private ClearCommand commandMessage(long offset) {
         return commandMessage(SUB_SLING_ID, SUB_AGENT_NAME, offset);
     }
     
-    private CommandMessage commandMessage(String subSlingId, String 
subAgentName, long offset) {
-        ClearCommand command = ClearCommand.newBuilder()
-                .setOffset(offset)
-                .build();
-        return CommandMessage.newBuilder()
-                .setClearCommand(command)
-                .setSubAgentName(subAgentName)
-                .setSubSlingId(subSlingId)
+    private ClearCommand commandMessage(String subSlingId, String 
subAgentName, long offset) {
+        return ClearCommand.builder()
+                .subAgentName(subAgentName)
+                .subSlingId(subSlingId)
+                .offset(offset)
                 .build();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 6ee55f8..a54a26b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -52,6 +52,10 @@ import 
org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
@@ -97,18 +101,12 @@ import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import 
org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
 
 @SuppressWarnings("unchecked")
 public class SubscriberTest {
@@ -119,23 +117,23 @@ public class SubscriberTest {
     private static final String PUB1_SLING_ID = "pub1sling";
     private static final String PUB1_AGENT_NAME = "pub1agent";
 
-    private static final PackageMessage BASIC_ADD_PACKAGE = 
PackageMessage.newBuilder()
-            .setPkgId("myid")
-            .setPubSlingId(PUB1_SLING_ID)
-            .setPubAgentName(PUB1_AGENT_NAME)
-            .setReqType(ReqType.ADD)
-            .setPkgType("journal")
-            .addAllPaths(Arrays.asList("/test"))
-            .setPkgBinary(ByteString.copyFrom(new byte[100]))
+    private static final PackageMessage BASIC_ADD_PACKAGE = 
PackageMessage.builder()
+            .pkgId("myid")
+            .pubSlingId(PUB1_SLING_ID)
+            .pubAgentName(PUB1_AGENT_NAME)
+            .reqType(ReqType.ADD)
+            .pkgType("journal")
+            .paths(Arrays.asList("/test"))
+            .pkgBinary(new byte[100])
             .build();
 
-    private static final PackageMessage BASIC_DEL_PACKAGE = 
PackageMessage.newBuilder()
-            .setPkgId("myid")
-            .setPubSlingId(PUB1_SLING_ID)
-            .setPubAgentName(PUB1_AGENT_NAME)
-            .setReqType(ReqType.DELETE)
-            .setPkgType("journal")
-            .addAllPaths(Arrays.asList("/test"))
+    private static final PackageMessage BASIC_DEL_PACKAGE = 
PackageMessage.builder()
+            .pkgId("myid")
+            .pubSlingId(PUB1_SLING_ID)
+            .pubAgentName(PUB1_AGENT_NAME)
+            .reqType(ReqType.DELETE)
+            .pkgType("journal")
+            .paths(Arrays.asList("/test"))
             .build();
 
     
@@ -193,7 +191,6 @@ public class SubscriberTest {
     private MessageHandler<PackageMessage> packageHandler;
 
 
-    @SuppressWarnings("rawtypes")
     @Before
     public void before() {
         DistributionSubscriber.QUEUE_FETCH_DELAY = 100;
@@ -207,7 +204,9 @@ public class SubscriberTest {
 
         mockMetrics();
 
-        
when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender,
 (MessageSender) discoverySender);
+        
when(clientProvider.<PackageStatusMessage>createSender(Mockito.eq(topics.getStatusTopic()))).thenReturn(statusSender);
+        
when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender);
+
         when(clientProvider.createPoller(
                 Mockito.anyString(),
                 Mockito.eq(Reset.earliest), 
@@ -251,8 +250,7 @@ public class SubscriberTest {
         
         sem.release();
         waitSubscriber(IDLE);
-        verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
-                anyObject());
+        verify(statusSender, times(0)).accept(anyObject());
         List<String> log = subscriber.getLog().getLines();
         // We do not use the DistributionLog anymore
         assertThat(log.size(), equalTo(0));
@@ -302,8 +300,7 @@ public class SubscriberTest {
         ).thenThrow(new RuntimeException("Expected"));
 
         packageHandler.handle(info, message);
-        verify(statusSender, 
timeout(10000).times(1)).send(eq(topics.getStatusTopic()),
-                anyObject());
+        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
     }
 
     @Test
@@ -317,8 +314,7 @@ public class SubscriberTest {
         packageHandler.handle(info, message);
         waitSubscriber(IDLE);
 
-        verify(statusSender, 
timeout(10000).times(1)).send(eq(topics.getStatusTopic()),
-                anyObject());
+        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
     }
 
     @Test

Reply via email to