This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 26fe564 SLING-10066 - Align package id logging
26fe564 is described below
commit 26fe5646c42fc71b07740aa5de8b3403549c1e80
Author: tmaret <[email protected]>
AuthorDate: Sat Mar 13 23:17:22 2021 +0100
SLING-10066 - Align package id logging
---
pom.xml | 2 +-
.../sling/distribution/journal/bookkeeper/BookKeeper.java | 9 ++++-----
.../journal/impl/discovery/DiscoveryService.java | 2 +-
.../journal/impl/precondition/PackageStatusWatcher.java | 2 +-
.../journal/impl/publisher/DistributionPublisher.java | 6 +++---
.../journal/impl/publisher/MessagingCacheCallback.java | 6 +++---
.../journal/impl/publisher/PackageDistributedNotifier.java | 6 +++---
.../journal/impl/publisher/PackageMessageFactory.java | 11 ++++++-----
.../journal/impl/publisher/PackageQueuedNotifier.java | 8 ++++----
.../distribution/journal/impl/publisher/RangePoller.java | 2 +-
.../distribution/journal/impl/subscriber/Announcer.java | 2 +-
.../journal/impl/subscriber/CommandPoller.java | 14 +++++++-------
.../journal/impl/subscriber/DistributionSubscriber.java | 10 +++++-----
.../journal/impl/publisher/DistributionPublisherTest.java | 4 ++--
14 files changed, 42 insertions(+), 42 deletions(-)
diff --git a/pom.xml b/pom.xml
index 587b7fb..b3c2d1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<!-- P R O J E C T
-->
<!--
======================================================================= -->
<artifactId>org.apache.sling.distribution.journal</artifactId>
- <version>0.1.17-SNAPSHOT</version>
+ <version>0.1.17-T202103131049-479dcb4</version>
<name>Apache Sling Journal based Content Distribution - Core bundle</name>
<description>Implementation of Apache Sling Content Distribution
components on top of an append-only persisted log</description>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index f8a6b18..c3aa6f8 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -139,7 +139,7 @@ public class BookKeeper implements Closeable {
* once, thanks to the order in which the content updates are applied.
*/
public void importPackage(PackageMessage pkgMsg, long offset, long
createdTime) throws DistributionException {
- log.info("Importing distribution package {} at offset {}", pkgMsg,
offset);
+ log.info("Importing distribution package {} at offset={}", pkgMsg,
offset);
addPackageMDC(pkgMsg);
try (Timer.Context context =
distributionMetricsService.getImportedPackageDuration().time();
ResourceResolver importerResolver =
getServiceResolver(SUBSERVICE_IMPORTER)) {
@@ -192,7 +192,7 @@ public class BookKeeper implements Closeable {
boolean giveUp = errorQueueEnabled && retries >=
config.getMaxRetries();
String retriesSt = errorQueueEnabled ?
Integer.toString(config.getMaxRetries()) : "infinite";
String action = giveUp ? "skip the package" : "retry later";
- String msg = format("Failed attempt (%s/%s) to import the distribution
package %s at offset %d because of '%s', the importer will %s", retries,
retriesSt, pkgMsg, offset, e.getMessage(), action);
+ String msg = format("Failed attempt (%s/%s) to import the distribution
package %s at offset=%d because of '%s', the importer will %s", retries,
retriesSt, pkgMsg, offset, e.getMessage(), action);
try {
LogMessage logMessage = getLogMessage(pubAgentName, msg, e);
logSender.accept(logMessage);
@@ -237,7 +237,7 @@ public class BookKeeper implements Closeable {
}
public void skipPackage(long offset) throws LoginException,
PersistenceException {
- log.info("Skipping package at offset {}", offset);
+ log.info("Skipping package at offset={}", offset);
if (shouldCommitSkipped()) {
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeOffset(resolver, offset);
@@ -316,8 +316,7 @@ public class BookKeeper implements Closeable {
}
private void removeFailedPackage(PackageMessage pkgMsg, long offset)
throws DistributionException {
- log.info("Removing failed distribution package {} of type {} at offset
{}",
- pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
+ log.info("Removing failed distribution package {} at offset={}",
pkgMsg, offset);
Timer.Context context =
distributionMetricsService.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED,
offset, pkgMsg.getPubAgentName()));
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index 222cc2b..7b5e9e3 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -182,7 +182,7 @@ public class DiscoveryService implements Runnable {
}
public void handleLog(MessageInfo info, LogMessage logMsg) {
- /**
+ /*
* We only have one DiscoveryService but possibly more than one
DistributionPublisher.
* So we send an event for each log message and let them listen to
these.
*/
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 5d42ca3..619641c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -65,7 +65,7 @@ public class PackageStatusWatcher implements Closeable {
Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
Status status = statusPerAgent.get(pkgOffset);
if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
- log.info("Considering offset {} as imported as status for this
package can not arrive anymore.", pkgOffset);
+ log.info("Considering offset={} imported as status for this
package can not arrive anymore.", pkgOffset);
return Status.IMPORTED;
}
return status;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 4645474..9a9387d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -278,12 +278,12 @@ public class DistributionPublisher implements
DistributionAgent {
timed(distributionMetricsService.getEnqueuePackageDuration(), ()
-> sender.accept(pkg));
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
distributionMetricsService.getAcceptedRequests().mark();
- String msg = String.format("Distribution request accepted with
%s", pkg);
+ String msg = String.format("Request accepted with distribution
package %s", pkg);
log.info(msg);
return new SimpleDistributionResponse(ACCEPTED, msg);
} catch (Throwable e) {
distributionMetricsService.getDroppedRequests().mark();
- String msg = String.format("Failed to append %s to the journal",
pkg);
+ String msg = String.format("Failed to append distribution package
%s to the journal", pkg);
log.error(msg, e);
if (e instanceof Error) {
throw (Error) e;
@@ -312,7 +312,7 @@ public class DistributionPublisher implements
DistributionAgent {
@Nonnull
private DistributionResponse executeUnsupported(DistributionRequest
request) {
- String msg = String.format("Request type %s is not supported by this
agent, expected one of %s",
+ String msg = String.format("Request requestType=%s not supported by
this agent, expected one of %s",
request.getRequestType(), REQ_TYPES.keySet());
log.info(msg);
return new
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 5bd9e87..f7d4b93 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -105,14 +105,14 @@ public class MessagingCacheCallback implements
CacheCallback {
}
private void sendClearCommand(String pubAgentName, AgentId subAgentId,
long offset) {
- ClearCommand commandMessage = ClearCommand.builder()
+ ClearCommand command = ClearCommand.builder()
.pubAgentName(pubAgentName)
.subSlingId(subAgentId.getSlingId())
.subAgentName(subAgentId.getAgentName())
.offset(offset)
.build();
- log.info("Sending clear command to subSlingId: {}, subAgentName: {}
with offset {}.", subAgentId.getSlingId(), subAgentId.getAgentName(), offset);
- commandSender.accept(commandMessage);
+ log.info("Sending clear command {}", command);
+ commandSender.accept(command);
}
@Override
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b6cc54d..b3e54c3 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -100,7 +100,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
}
protected void notifyDistributed(String pubAgentName,
DistributionQueueItem queueItem) {
- LOG.debug("Sending distributed notifications for pub agent {} queue
item {}", pubAgentName, queueItem.getPackageId());
+ LOG.debug("Sending distributed notifications for pubAgentName={},
pkgId={}", pubAgentName, queueItem.getPackageId());
sendEvt(pubAgentName, queueItem);
if (sendMsg) {
sendMsg(pubAgentName, queueItem);
@@ -112,7 +112,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
PackageDistributedMessage msg =
createDistributedMessage(pubAgentName, queueItem);
sender.accept(msg);
} catch (Exception e) {
- LOG.warn("Exception when sending package distributed message for
pub agent {} queue item {}", pubAgentName, queueItem.getPackageId(), e);
+ LOG.warn("Exception when sending package distributed message for
pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
}
}
@@ -131,7 +131,7 @@ public class PackageDistributedNotifier implements
TopologyChangeHandler {
Event distributed =
DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
eventAdmin.sendEvent(distributed);
} catch (Exception e) {
- LOG.warn("Exception when sending package distributed event for pub
agent {} queue item {}", pubAgentName, queueItem.getPackageId(), e);
+ LOG.warn("Exception when sending package distributed event for
pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
}
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 03335ea..230ee6f 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -68,12 +68,12 @@ public class PackageMessageFactory {
@Activate
public void activate() {
pubSlingId = slingSettings.getSlingId();
- LOG.info("Started package message factory for pubSlingId {}",
pubSlingId);
+ LOG.info("Started package message factory for pubSlingId={}",
pubSlingId);
}
@Deactivate
public void deactivate() {
- LOG.info("Stopped package message factory for pubSlingId {}",
pubSlingId);
+ LOG.info("Stopped package message factory for pubSlingId={}",
pubSlingId);
}
public PackageMessage create(
@@ -86,7 +86,7 @@ public class PackageMessageFactory {
case ADD: return createAdd(packageBuilder, resourceResolver,
pubAgentName, request);
case DELETE: return createDelete(packageBuilder, resourceResolver,
request, pubAgentName);
case TEST: return createTest(packageBuilder, resourceResolver,
pubAgentName);
- default: throw new
IllegalArgumentException(String.format("Unsupported request type %s",
request.getRequestType()));
+ default: throw new
IllegalArgumentException(String.format("Unsupported request with
requestType=%s", request.getRequestType()));
}
}
@@ -111,9 +111,9 @@ public class PackageMessageFactory {
.pkgType(packageBuilder.getType());
String storeRef;
+ String id;
try {
- String id = UUID.randomUUID().toString();
- LOG.debug("Creating package binary with id [{}] for package [{}],
length [{}]", id, disPkg.getId(), pkgLength);
+ id = UUID.randomUUID().toString();
storeRef = binaryStore.put(id, disPkg.createInputStream(),
pkgLength);
} catch (IOException e) {
throw new DistributionException(e.getMessage(), e);
@@ -125,6 +125,7 @@ public class PackageMessageFactory {
pkgBuilder.pkgBinary(pkgBinary);
}
PackageMessage pipePackage = pkgBuilder.build();
+ LOG.debug("Created distribution package {} with binary id={}",
pipePackage, id);
disPkg.delete();
return pipePackage;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index 8899397..b51b252 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -53,7 +53,7 @@ public class PackageQueuedNotifier implements EventHandler {
@Deactivate
public void deactivate() {
receiveCallbacks.forEach((packageId, callback) -> {
- LOG.debug("Cancel wait condition for package {}", packageId);
+ LOG.debug("Cancel wait condition for distribution package with
pkgId={}", packageId);
callback.cancel(true);
});
LOG.info("Package queue notifier service stopped");
@@ -62,7 +62,7 @@ public class PackageQueuedNotifier implements EventHandler {
@Override
public void handleEvent(Event event) {
String packageId = (String)
event.getProperty(DistributionEvent.PACKAGE_ID);
- LOG.debug("Handling event for packageId {}", packageId);
+ LOG.debug("Handling event for pkgId={}", packageId);
CompletableFuture<Void> callback = null;
if (packageId != null) {
callback = receiveCallbacks.remove(packageId);
@@ -73,14 +73,14 @@ public class PackageQueuedNotifier implements EventHandler {
}
public CompletableFuture<Void> registerWait(String packageId) {
- LOG.debug("Registering wait condition for packageId {}", packageId);
+ LOG.debug("Registering wait condition for pkgId={}", packageId);
CompletableFuture<Void> packageReceived = new CompletableFuture<>();
receiveCallbacks.put(packageId, packageReceived);
return packageReceived;
}
public void unRegisterWait(String packageId) {
- LOG.debug("Un-registering wait condition for packageId {}", packageId);
+ LOG.debug("Un-registering wait condition for pkgId={}", packageId);
receiveCallbacks.remove(packageId);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
index 040db85..04f8d0a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
@@ -79,7 +79,7 @@ public class RangePoller {
private void handlePackage(MessageInfo info, PackageMessage message) {
long offset = info.getOffset();
- LOG.debug("Reading offset {}", offset);
+ LOG.debug("Consuming distribution package {} at offset={}", message,
offset);
if (offset < maxOffset) {
if (isNotTestMessage(message)) {
messages.add(new FullMessage<>(info, message));
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index b3a3414..08d516d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -79,9 +79,9 @@ class Announcer implements Runnable, Closeable {
@Override
public void run() {
- LOG.debug("Sending discovery message for agent {}", subAgentName);
try {
DiscoveryMessage msg = createDiscoveryMessage();
+ LOG.debug("Sending discovery message {}", msg);
sender.accept(msg);
} catch (Exception e) {
LOG.info("Failed to send discovery message for agent {}, {}",
subAgentName, e.getMessage(), e);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 063c185..de8295d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -56,21 +56,21 @@ public class CommandPoller implements Closeable {
return offset <= clearOffset.longValue();
}
- private void handleCommandMessage(MessageInfo info, ClearCommand message) {
+ private void handleCommandMessage(MessageInfo info, ClearCommand command) {
idleCheck.busy(0);
- if (!subSlingId.equals(message.getSubSlingId()) ||
!subAgentName.equals(message.getSubAgentName())) {
- LOG.debug("Skip command for subSlingId {}",
message.getSubSlingId());
+ if (!subSlingId.equals(command.getSubSlingId()) ||
!subAgentName.equals(command.getSubAgentName())) {
+ LOG.debug("Skip command for subSlingId {}",
command.getSubSlingId());
return;
}
- handleClearCommand(message.getOffset());
+ handleClearCommand(command);
idleCheck.idle();
}
- private void handleClearCommand(long offset) {
+ private void handleClearCommand(ClearCommand command) {
long oldOffset = clearOffset.get();
- long newOffset = updateClearOffsetIfLarger(offset);
- LOG.info("Handled clear command for offset {}. Old clear offset was
{}, new clear offset is {}.", offset, oldOffset, newOffset);
+ long newOffset = updateClearOffsetIfLarger(command.getOffset());
+ LOG.info("Handled clear command {}. Old clear offset was {}, new clear
offset is {}.", command, oldOffset, newOffset);
}
private long updateClearOffsetIfLarger(long offset) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b306cf9..53ee052 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -238,23 +238,23 @@ public class DistributionSubscriber {
private void handlePackageMessage(MessageInfo info, PackageMessage
message) {
if (shouldEnqueue(info, message)) {
- enqueue(new FullMessage<PackageMessage>(info, message));
+ enqueue(new FullMessage<>(info, message));
} else {
try {
bookKeeper.skipPackage(info.getOffset());
} catch (PersistenceException | LoginException e) {
- LOG.info("Error marking message at offset {} as skipped",
info.getOffset(), e);
+ LOG.warn("Error marking distribution package {} at offset={}
as skipped", message, info.getOffset(), e);
}
}
}
private boolean shouldEnqueue(MessageInfo info, PackageMessage message) {
if (!queueNames.contains(message.getPubAgentName())) {
- LOG.info("Skipping package for Publisher agent {} at offset {}
(not subscribed)", message.getPubAgentName(), info.getOffset());
+ LOG.info("Skipping distribution package {} at offset={} (not
subscribed)", message, info.getOffset());
return false;
}
if (!pkgType.equals(message.getPkgType())) {
- LOG.warn("Skipping package with type {} at offset {}",
message.getPkgType(), info.getOffset());
+ LOG.warn("Skipping distribution package {} at offset={} (bad
pkgType)", message, info.getOffset());
return false;
}
return true;
@@ -382,7 +382,7 @@ public class DistributionSubscriber {
return decision;
}
}
- throw new PreConditionTimeoutException("Timeout waiting for package
offset " + offset + " on status topic.");
+ throw new PreConditionTimeoutException("Timeout waiting for
distribution package at offset=" + offset + " on status topic");
}
private static void delay(long delayInMs) {
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index faed812..7d3e980 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -200,7 +200,7 @@ public class DistributionPublisherTest {
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
containsString("Started Publisher agent pub1agent1"),
- containsString("Request type PULL is not supported by this
agent, expected one of")));
+ containsString("not supported by this agent")));
}
@Test
@@ -282,7 +282,7 @@ public class DistributionPublisherTest {
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
containsString("Started Publisher agent pub1agent1"),
- containsString("Distribution request accepted")));
+ containsString("Request accepted")));
}
private PackageMessage mockPackage(DistributionRequest request) throws
IOException {