This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 26fe564  SLING-10066 - Align package id logging
26fe564 is described below

commit 26fe5646c42fc71b07740aa5de8b3403549c1e80
Author: tmaret <[email protected]>
AuthorDate: Sat Mar 13 23:17:22 2021 +0100

    SLING-10066 - Align package id logging
---
 pom.xml                                                    |  2 +-
 .../sling/distribution/journal/bookkeeper/BookKeeper.java  |  9 ++++-----
 .../journal/impl/discovery/DiscoveryService.java           |  2 +-
 .../journal/impl/precondition/PackageStatusWatcher.java    |  2 +-
 .../journal/impl/publisher/DistributionPublisher.java      |  6 +++---
 .../journal/impl/publisher/MessagingCacheCallback.java     |  6 +++---
 .../journal/impl/publisher/PackageDistributedNotifier.java |  6 +++---
 .../journal/impl/publisher/PackageMessageFactory.java      | 11 ++++++-----
 .../journal/impl/publisher/PackageQueuedNotifier.java      |  8 ++++----
 .../distribution/journal/impl/publisher/RangePoller.java   |  2 +-
 .../distribution/journal/impl/subscriber/Announcer.java    |  2 +-
 .../journal/impl/subscriber/CommandPoller.java             | 14 +++++++-------
 .../journal/impl/subscriber/DistributionSubscriber.java    | 10 +++++-----
 .../journal/impl/publisher/DistributionPublisherTest.java  |  4 ++--
 14 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/pom.xml b/pom.xml
index 587b7fb..b3c2d1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
     <!-- P R O J E C T                                                         
  -->
     <!-- 
======================================================================= -->
     <artifactId>org.apache.sling.distribution.journal</artifactId>
-    <version>0.1.17-SNAPSHOT</version>
+    <version>0.1.17-T202103131049-479dcb4</version>
 
     <name>Apache Sling Journal based Content Distribution - Core bundle</name>
     <description>Implementation of Apache Sling Content Distribution 
components on top of an append-only persisted log</description>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index f8a6b18..c3aa6f8 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -139,7 +139,7 @@ public class BookKeeper implements Closeable {
      * once, thanks to the order in which the content updates are applied.
      */
     public void importPackage(PackageMessage pkgMsg, long offset, long 
createdTime) throws DistributionException {
-        log.info("Importing distribution package {} at offset {}", pkgMsg, 
offset);
+        log.info("Importing distribution package {} at offset={}", pkgMsg, 
offset);
         addPackageMDC(pkgMsg);
         try (Timer.Context context = 
distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
@@ -192,7 +192,7 @@ public class BookKeeper implements Closeable {
         boolean giveUp = errorQueueEnabled && retries >= 
config.getMaxRetries();
         String retriesSt = errorQueueEnabled ? 
Integer.toString(config.getMaxRetries()) : "infinite";
         String action = giveUp ? "skip the package" : "retry later";
-        String msg = format("Failed attempt (%s/%s) to import the distribution 
package %s at offset %d because of '%s', the importer will %s", retries, 
retriesSt, pkgMsg, offset, e.getMessage(), action);
+        String msg = format("Failed attempt (%s/%s) to import the distribution 
package %s at offset=%d because of '%s', the importer will %s", retries, 
retriesSt, pkgMsg, offset, e.getMessage(), action);
         try {
             LogMessage logMessage = getLogMessage(pubAgentName, msg, e);
             logSender.accept(logMessage);
@@ -237,7 +237,7 @@ public class BookKeeper implements Closeable {
     }
     
     public void skipPackage(long offset) throws LoginException, 
PersistenceException {
-        log.info("Skipping package at offset {}", offset);
+        log.info("Skipping package at offset={}", offset);
         if (shouldCommitSkipped()) {
             try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
                 storeOffset(resolver, offset);
@@ -316,8 +316,7 @@ public class BookKeeper implements Closeable {
     }
     
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) 
throws DistributionException {
-        log.info("Removing failed distribution package {} of type {} at offset 
{}", 
-                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
+        log.info("Removing failed distribution package {} at offset={}", 
pkgMsg, offset);
         Timer.Context context = 
distributionMetricsService.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED, 
offset, pkgMsg.getPubAgentName()));
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index 222cc2b..7b5e9e3 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -182,7 +182,7 @@ public class DiscoveryService implements Runnable {
     }
     
     public void handleLog(MessageInfo info, LogMessage logMsg) {
-        /**
+        /*
          * We only have one DiscoveryService but possibly more than one 
DistributionPublisher. 
          * So we send an event for each log message and let them listen to 
these.
          */
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 5d42ca3..619641c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -65,7 +65,7 @@ public class PackageStatusWatcher implements Closeable {
         Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
         Status status = statusPerAgent.get(pkgOffset);
         if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
-            log.info("Considering offset {} as imported as status for this 
package can not arrive anymore.", pkgOffset);
+            log.info("Considering offset={} imported as status for this 
package can not arrive anymore.", pkgOffset);
             return Status.IMPORTED;
         }
         return status;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 4645474..9a9387d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -278,12 +278,12 @@ public class DistributionPublisher implements 
DistributionAgent {
             timed(distributionMetricsService.getEnqueuePackageDuration(), () 
-> sender.accept(pkg));
             
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
             distributionMetricsService.getAcceptedRequests().mark();
-            String msg = String.format("Distribution request accepted with 
%s", pkg);
+            String msg = String.format("Request accepted with distribution 
package %s", pkg);
             log.info(msg);
             return new SimpleDistributionResponse(ACCEPTED, msg);
         } catch (Throwable e) {
             distributionMetricsService.getDroppedRequests().mark();
-            String msg = String.format("Failed to append %s to the journal", 
pkg);
+            String msg = String.format("Failed to append distribution package 
%s to the journal", pkg);
             log.error(msg, e);
             if (e instanceof Error) {
                 throw (Error) e;
@@ -312,7 +312,7 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     @Nonnull
     private DistributionResponse executeUnsupported(DistributionRequest 
request) {
-        String msg = String.format("Request type %s is not supported by this 
agent, expected one of %s",
+        String msg = String.format("Request requestType=%s not supported by 
this agent, expected one of %s",
                 request.getRequestType(), REQ_TYPES.keySet());
         log.info(msg);
         return new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 5bd9e87..f7d4b93 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -105,14 +105,14 @@ public class MessagingCacheCallback implements 
CacheCallback {
     }
     
     private void sendClearCommand(String pubAgentName, AgentId subAgentId, 
long offset) {
-        ClearCommand commandMessage = ClearCommand.builder()
+        ClearCommand command = ClearCommand.builder()
                 .pubAgentName(pubAgentName)
                 .subSlingId(subAgentId.getSlingId())
                 .subAgentName(subAgentId.getAgentName())
                 .offset(offset)
                 .build();
-        log.info("Sending clear command to subSlingId: {}, subAgentName: {} 
with offset {}.", subAgentId.getSlingId(), subAgentId.getAgentName(), offset);
-        commandSender.accept(commandMessage);
+        log.info("Sending clear command {}", command);
+        commandSender.accept(command);
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b6cc54d..b3e54c3 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -100,7 +100,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     }
 
     protected void notifyDistributed(String pubAgentName, 
DistributionQueueItem queueItem) {
-        LOG.debug("Sending distributed notifications for pub agent {} queue 
item {}", pubAgentName, queueItem.getPackageId());
+        LOG.debug("Sending distributed notifications for pubAgentName={}, 
pkgId={}", pubAgentName, queueItem.getPackageId());
         sendEvt(pubAgentName, queueItem);
         if (sendMsg) {
             sendMsg(pubAgentName, queueItem);
@@ -112,7 +112,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
             PackageDistributedMessage msg = 
createDistributedMessage(pubAgentName, queueItem);
             sender.accept(msg);
         } catch (Exception e) {
-            LOG.warn("Exception when sending package distributed message for 
pub agent {} queue item {}", pubAgentName, queueItem.getPackageId(), e);
+            LOG.warn("Exception when sending package distributed message for 
pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
         }
     }
 
@@ -131,7 +131,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
             Event distributed = 
DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
         } catch (Exception e) {
-            LOG.warn("Exception when sending package distributed event for pub 
agent {} queue item {}", pubAgentName, queueItem.getPackageId(), e);
+            LOG.warn("Exception when sending package distributed event for 
pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
         }
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 03335ea..230ee6f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -68,12 +68,12 @@ public class PackageMessageFactory {
     @Activate
     public void activate() {
         pubSlingId = slingSettings.getSlingId();
-        LOG.info("Started package message factory for pubSlingId {}", 
pubSlingId);
+        LOG.info("Started package message factory for pubSlingId={}", 
pubSlingId);
     }
 
     @Deactivate
     public void deactivate() {
-        LOG.info("Stopped package message factory for pubSlingId {}", 
pubSlingId);
+        LOG.info("Stopped package message factory for pubSlingId={}", 
pubSlingId);
     }
 
     public PackageMessage create(
@@ -86,7 +86,7 @@ public class PackageMessageFactory {
             case ADD: return createAdd(packageBuilder, resourceResolver, 
pubAgentName, request);
             case DELETE: return createDelete(packageBuilder, resourceResolver, 
request, pubAgentName);
             case TEST: return createTest(packageBuilder, resourceResolver, 
pubAgentName);
-            default: throw new 
IllegalArgumentException(String.format("Unsupported request type %s", 
request.getRequestType()));
+            default: throw new 
IllegalArgumentException(String.format("Unsupported request with 
requestType=%s", request.getRequestType()));
         }
     }
 
@@ -111,9 +111,9 @@ public class PackageMessageFactory {
                 .pkgType(packageBuilder.getType());
 
         String storeRef;
+        String id;
         try {
-            String id = UUID.randomUUID().toString();
-            LOG.debug("Creating package binary with id [{}] for package [{}], 
length [{}]", id, disPkg.getId(), pkgLength);
+            id = UUID.randomUUID().toString();
             storeRef =  binaryStore.put(id, disPkg.createInputStream(), 
pkgLength);
         } catch (IOException e) {
             throw new DistributionException(e.getMessage(), e);
@@ -125,6 +125,7 @@ public class PackageMessageFactory {
             pkgBuilder.pkgBinary(pkgBinary);
         }
         PackageMessage pipePackage = pkgBuilder.build();
+        LOG.debug("Created distribution package {} with binary id={}", 
pipePackage, id);
         disPkg.delete();
         return pipePackage;
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index 8899397..b51b252 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -53,7 +53,7 @@ public class PackageQueuedNotifier implements EventHandler {
     @Deactivate
     public void deactivate() {
         receiveCallbacks.forEach((packageId, callback) -> {
-            LOG.debug("Cancel wait condition for package {}", packageId);
+            LOG.debug("Cancel wait condition for distribution package with 
pkgId={}", packageId);
             callback.cancel(true);
         });
         LOG.info("Package queue notifier service stopped");
@@ -62,7 +62,7 @@ public class PackageQueuedNotifier implements EventHandler {
     @Override
     public void handleEvent(Event event) {
         String packageId = (String) 
event.getProperty(DistributionEvent.PACKAGE_ID);
-        LOG.debug("Handling event for packageId {}", packageId);
+        LOG.debug("Handling event for pkgId={}", packageId);
         CompletableFuture<Void> callback = null;
         if (packageId != null) {
             callback = receiveCallbacks.remove(packageId);
@@ -73,14 +73,14 @@ public class PackageQueuedNotifier implements EventHandler {
     }
 
     public CompletableFuture<Void> registerWait(String packageId) {
-        LOG.debug("Registering wait condition for packageId {}", packageId);
+        LOG.debug("Registering wait condition for pkgId={}", packageId);
         CompletableFuture<Void> packageReceived = new CompletableFuture<>();
         receiveCallbacks.put(packageId, packageReceived);
         return packageReceived;
     }
 
     public void unRegisterWait(String packageId) {
-        LOG.debug("Un-registering wait condition for packageId {}", packageId);
+        LOG.debug("Un-registering wait condition for pkgId={}", packageId);
         receiveCallbacks.remove(packageId);
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
index 040db85..04f8d0a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
@@ -79,7 +79,7 @@ public class RangePoller {
 
     private void handlePackage(MessageInfo info, PackageMessage message) {
         long offset = info.getOffset();
-        LOG.debug("Reading offset {}", offset);
+        LOG.debug("Consuming distribution package {} at offset={}", message, 
offset);
         if (offset < maxOffset) {
             if (isNotTestMessage(message)) {
                 messages.add(new FullMessage<>(info, message));
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index b3a3414..08d516d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -79,9 +79,9 @@ class Announcer implements Runnable, Closeable {
 
     @Override
     public void run() {
-        LOG.debug("Sending discovery message for agent {}", subAgentName);
         try {
             DiscoveryMessage msg = createDiscoveryMessage();
+            LOG.debug("Sending discovery message {}", msg);
             sender.accept(msg);
         } catch (Exception e) {
             LOG.info("Failed to send discovery message for agent {}, {}", 
subAgentName, e.getMessage(), e);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 063c185..de8295d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -56,21 +56,21 @@ public class CommandPoller implements Closeable {
         return offset <= clearOffset.longValue();
     }
 
-    private void handleCommandMessage(MessageInfo info, ClearCommand message) {
+    private void handleCommandMessage(MessageInfo info, ClearCommand command) {
         idleCheck.busy(0);
-        if (!subSlingId.equals(message.getSubSlingId()) || 
!subAgentName.equals(message.getSubAgentName())) {
-            LOG.debug("Skip command for subSlingId {}", 
message.getSubSlingId());
+        if (!subSlingId.equals(command.getSubSlingId()) || 
!subAgentName.equals(command.getSubAgentName())) {
+            LOG.debug("Skip command for subSlingId {}", 
command.getSubSlingId());
             return;
         }
 
-        handleClearCommand(message.getOffset());
+        handleClearCommand(command);
         idleCheck.idle();
     }
 
-    private void handleClearCommand(long offset) {
+    private void handleClearCommand(ClearCommand command) {
         long oldOffset = clearOffset.get();
-        long newOffset = updateClearOffsetIfLarger(offset);
-        LOG.info("Handled clear command for offset {}. Old clear offset was 
{}, new clear offset is {}.", offset, oldOffset, newOffset);
+        long newOffset = updateClearOffsetIfLarger(command.getOffset());
+        LOG.info("Handled clear command {}. Old clear offset was {}, new clear 
offset is {}.", command, oldOffset, newOffset);
     }
 
     private long updateClearOffsetIfLarger(long offset) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b306cf9..53ee052 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -238,23 +238,23 @@ public class DistributionSubscriber {
 
     private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
         if (shouldEnqueue(info, message)) {
-            enqueue(new FullMessage<PackageMessage>(info, message));
+            enqueue(new FullMessage<>(info, message));
         } else {
             try {
                 bookKeeper.skipPackage(info.getOffset());
             } catch (PersistenceException | LoginException e) {
-                LOG.info("Error marking message at offset {} as skipped", 
info.getOffset(), e);
+                LOG.warn("Error marking distribution package {} at offset={} 
as skipped", message, info.getOffset(), e);
             }
         }
     }
 
     private boolean shouldEnqueue(MessageInfo info, PackageMessage message) {
         if (!queueNames.contains(message.getPubAgentName())) {
-            LOG.info("Skipping package for Publisher agent {} at offset {} 
(not subscribed)", message.getPubAgentName(), info.getOffset());
+            LOG.info("Skipping distribution package {} at offset={} (not 
subscribed)", message, info.getOffset());
             return false;
         }
         if (!pkgType.equals(message.getPkgType())) {
-            LOG.warn("Skipping package with type {} at offset {}", 
message.getPkgType(), info.getOffset());
+            LOG.warn("Skipping distribution package {} at offset={} (bad 
pkgType)", message, info.getOffset());
             return false;
         }
         return true;
@@ -382,7 +382,7 @@ public class DistributionSubscriber {
                 return decision;
             }
         }
-        throw new PreConditionTimeoutException("Timeout waiting for package 
offset " + offset + " on status topic.");
+        throw new PreConditionTimeoutException("Timeout waiting for 
distribution package at offset=" + offset + " on status topic");
     }
 
     private static void delay(long delayInMs) {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index faed812..7d3e980 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -200,7 +200,7 @@ public class DistributionPublisherTest {
         List<String> log = publisher.getLog().getLines();
         assertThat(log, contains(
                 containsString("Started Publisher agent pub1agent1"),
-                containsString("Request type PULL is not supported by this 
agent, expected one of")));
+                containsString("not supported by this agent")));
     }
     
     @Test
@@ -282,7 +282,7 @@ public class DistributionPublisherTest {
         List<String> log = publisher.getLog().getLines();
         assertThat(log, contains(
                 containsString("Started Publisher agent pub1agent1"),
-                containsString("Distribution request accepted")));
+                containsString("Request accepted")));
     }
 
     private PackageMessage mockPackage(DistributionRequest request) throws 
IOException {

Reply via email to