This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9583 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit dac592ab7a227b6b05fd0955d853f67280ca6432 Author: Christian Schneider <[email protected]> AuthorDate: Fri Jul 10 17:03:18 2020 +0200 SLING-9583 - Combine parameters into value object --- .../impl/publisher/DistributionPublisher.java | 9 ++-- .../journal/impl/queue/PubQueueProvider.java | 4 +- .../distribution/journal/impl/queue/QueueId.java | 53 ++++++++++++++++++++++ .../impl/queue/impl/PubQueueProviderImpl.java | 25 +++++----- .../impl/publisher/DistributionPublisherTest.java | 3 +- .../impl/queue/impl/PubQueueProviderTest.java | 12 +++-- 6 files changed, 80 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 6b9e77a..2c01d7b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -47,6 +47,7 @@ import javax.management.NotCompliantMBeanException; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; +import org.apache.sling.distribution.journal.impl.queue.QueueId; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.shared.AgentState; import org.apache.sling.distribution.journal.shared.DefaultDistributionLog; @@ -250,7 +251,8 @@ public class DistributionPublisher implements DistributionAgent { @Nonnull private DistributionQueue getErrorQueue(String queueName) { AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error")); - return pubQueueProvider.getErrorQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName); + QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName); + return pubQueueProvider.getErrorQueue(queueId); } @CheckForNull @@ -259,13 +261,12 @@ public class DistributionPublisher implements DistributionAgent { AgentId subAgentId = new AgentId(queueName); State state = view.getState(subAgentId.getAgentId(), pubAgentName); if (state != null) { - return pubQueueProvider.getQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1, state.getRetries(), state.isEditable()); + QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName); + return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable()); } return null; } - - @Nonnull @Override public DistributionLog getLog() { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java index 178f6f2..f3bac6b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java @@ -27,9 +27,9 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue; public interface PubQueueProvider { @Nonnull - DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable); + DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable); @Nonnull - DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName); + DistributionQueue getErrorQueue(QueueId queueId); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java new file mode 100644 index 0000000..61f33d3 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.queue; + +public class QueueId { + private final String pubAgentName; + private final String subSlingId; + private final String subAgentName; + private final String queueName; + + public QueueId(String pubAgentName, String subSlingId, String subAgentName, String queueName) { + this.pubAgentName = pubAgentName; + this.subSlingId = subSlingId; + this.subAgentName = subAgentName; + this.queueName = queueName; + } + + public String getPubAgentName() { + return pubAgentName; + } + + public String getSubSlingId() { + return subSlingId; + } + + public String getSubAgentName() { + return subAgentName; + } + + public String getQueueName() { + return queueName; + } + + public String getErrorQueueKey() { + return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName); + } +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java index c019c09..f9dcc45 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java @@ -34,6 +34,7 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; +import org.apache.sling.distribution.journal.impl.queue.QueueId; import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; @@ -105,17 +106,17 @@ public class PubQueueProviderImpl implements PubQueueProvider { @Nonnull @Override - public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) { - OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset); - ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset); + public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) { + OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset); + ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset); ClearCallback callback = editable ? editableCallback : null; - return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback); + return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback); } @Nonnull @Override - public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) { - String errorQueueKey = errorQueueKey(pubAgentName, subSlingId, subAgentName); + public DistributionQueue getErrorQueue(QueueId queueId) { + String errorQueueKey = queueId.getErrorQueueKey(); OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>()); long headOffset = errorQueue.getHeadOffset(); final OffsetQueue<DistributionQueueItem> agentQueue; @@ -123,25 +124,21 @@ public class PubQueueProviderImpl implements PubQueueProvider { agentQueue = new OffsetQueueImpl<>(); } else { long minReferencedOffset = errorQueue.getItem(headOffset); - agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset); + agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minReferencedOffset); } - return new PubErrQueue(queueName, agentQueue, errorQueue); + return new PubErrQueue(queueId.getQueueName(), agentQueue, errorQueue); } public void handleStatus(MessageInfo info, PackageStatusMessage message) { if (message.getStatus() == Status.REMOVED_FAILED) { - String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName()); + QueueId queueId = new QueueId(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName(), ""); + String errorQueueKey = queueId.getErrorQueueKey(); OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>()); errorQueue.putItem(info.getOffset(), message.getOffset()); } } - @Nonnull - private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) { - return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName); - } - private void sendClearCommand(String subSlingId, String subAgentName, long offset) { ClearCommand commandMessage = ClearCommand.builder() .subSlingId(subSlingId) diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 9f9de96..0a66ad7 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -262,8 +262,7 @@ public class DistributionPublisherTest { when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME)); State state = stateWithMaxRetries(1); when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state); - AgentId subAgentId = new AgentId(QUEUE_NAME); - when(pubQueueProvider.getQueue(PUB1AGENT1, subAgentId.getSlingId(), subAgentId.getAgentName(), QUEUE_NAME, 2, 0, false)) + when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false))) .thenThrow(new RuntimeException("Error")); Counter counter = new TestCounter(); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index da13e6b..00eae27 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -45,6 +45,7 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.queue.QueueId; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; @@ -140,13 +141,14 @@ public class PubQueueProviderTest { handler.handle(info(2L), packageMessage("packageid3", PUB1_AGENT_NAME)); // Full pub1 queue contains all packages from pub1 - DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 0, -1, false); + QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); + DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false); Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator(); assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1")); assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3")); // With offset 1 first package is removed - DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 1, -1, false); + DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false); Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator(); assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3")); assertThat(it2.hasNext(), equalTo(false)); @@ -161,7 +163,8 @@ public class PubQueueProviderTest { @Test public void testEmptyErrorQueue() throws Exception { - DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); + QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); + DistributionQueue queue = queueProvider.getErrorQueue(queueId); assertThat(queue.getStatus().getItemsCount(), equalTo(0)); } @@ -176,7 +179,8 @@ public class PubQueueProviderTest { PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1); statHandler.handle(info, statusMsg1); - DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); + QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); + DistributionQueue queue = queueProvider.getErrorQueue(queueId); assertThat(queue.getStatus().getItemsCount(), equalTo(1)); DistributionQueueEntry head = queue.getHead(); DistributionQueueItem item = head.getItem();
