This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 58ec625d1e91f208e2dd05178384b1ff05d3d56d Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Wed Jan 15 10:43:42 2020 +0100 SLING-8996 - Fix error in clear callback. Improve test and logging --- .../journal/impl/queue/impl/PubQueue.java | 10 +- .../impl/queue/impl/PubQueueProviderImpl.java | 3 +- .../journal/impl/subscriber/CommandPoller.java | 5 +- .../journal/impl/queue/impl/PubQueueTest.java | 198 ++++++++++++--------- 4 files changed, 127 insertions(+), 89 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java index 93f0dca..9bfd4fd 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java @@ -47,9 +47,13 @@ import org.apache.sling.distribution.queue.DistributionQueueState; import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.apache.sling.distribution.queue.DistributionQueueType; import org.apache.sling.distribution.queue.spi.DistributionQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ParametersAreNonnullByDefault public class PubQueue implements DistributionQueue { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); private final String queueName; @@ -134,6 +138,7 @@ public class PubQueue implements DistributionQueue { * Until then, the REMOVABLE capability is provided * but only allows to remove the head of the queue. */ + log.info("Removing queue entry {}", entryId); DistributionQueueEntry headEntry = getHead(); if (headEntry != null) { if (headEntry.getId().equals(entryId)) { @@ -159,6 +164,7 @@ public class PubQueue implements DistributionQueue { * which clears from the head entry to the entry * provided with the max offset (tailEntry). */ + log.info("Removing queue entries {}", entryIds); Optional<String> tailEntryId = entryIds.stream() .max((e1, e2) -> compare(EntryUtil.entryOffset(e1), EntryUtil.entryOffset(e2))); return (tailEntryId.isPresent()) @@ -214,6 +220,7 @@ public class PubQueue implements DistributionQueue { } private Iterable<DistributionQueueEntry> clear(String tailEntryId) { + log.info("Clearing up to tail queue entry {}", tailEntryId); List<DistributionQueueEntry> removed = new ArrayList<>(); for (DistributionQueueEntry entry : getEntries(0, -1)) { removed.add(entry); @@ -229,7 +236,8 @@ public class PubQueue implements DistributionQueue { if (clearCallback == null) { throw new UnsupportedOperationException(); } - clearCallback.clear(EntryUtil.entryOffset(tailEntry.getId())); + long offset = EntryUtil.entryOffset(tailEntry.getId()); + clearCallback.clear(offset); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java index 8b253b7..3b7feab 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java @@ -108,7 +108,7 @@ public class PubQueueProviderImpl implements PubQueueProvider { @Override public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) { OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset); - ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset); + ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset); ClearCallback callback = editable ? editableCallback : null; return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback); } @@ -152,6 +152,7 @@ public class PubQueueProviderImpl implements PubQueueProvider { .setSubAgentName(subAgentName) .setClearCommand(clearCommand) .build(); + LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset); sender.send(topics.getCommandTopic(), commandMessage); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java index e56828f..f9d047b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java @@ -81,8 +81,9 @@ public class CommandPoller implements Closeable { } private void handleClearCommand(long offset) { - updateClearOffsetIfLarger(offset); - LOG.info("Handled clear command for offset {}", offset); + long oldOffset = clearOffset.get(); + long newOffset = updateClearOffsetIfLarger(offset); + LOG.info("Handled clear command for offset {}. Old clear offset was {}, new clear offset is {}.", offset, oldOffset, newOffset); } private long updateClearOffsetIfLarger(long offset) { diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java index 25c15bf..9872ad9 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java @@ -18,134 +18,130 @@ */ package org.apache.sling.distribution.journal.impl.queue.impl; +import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET; +import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION; +import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP; +import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.DistributionQueueType; -import org.apache.sling.distribution.queue.spi.DistributionQueue; +import org.junit.Before; import org.junit.Test; - -import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.*; -import static java.util.Collections.*; -import static org.junit.Assert.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("serial") public class PubQueueTest { - private static final String TOPIC = "topic"; - private static final String PARTITION = "0"; - private static final String QUEUE_NAME = "queueName"; - - private static final String PACKAGE_ID_1 = "package-1"; - - private static final String PACKAGE_ID_2 = "package-2"; - - private static final String PACKAGE_ID_3 = "package-3"; - - private static final ClearCallback NO_OP = (offset) -> {}; - - private static final OffsetQueue<DistributionQueueItem> EMPTY_QUEUE = new OffsetQueueImpl<>(); - - private static final OffsetQueue<DistributionQueueItem> THREE_ENTRY_QUEUE = new OffsetQueueImpl<>(); - - static { - - THREE_ENTRY_QUEUE.putItem(100, new DistributionQueueItem(PACKAGE_ID_1, new HashMap<String, Object>(){{ - put(RECORD_TOPIC, TOPIC); - put(RECORD_PARTITION, PARTITION); - put(RECORD_OFFSET, 100); - put(RECORD_TIMESTAMP, 1541538150582L); - }})); - - THREE_ENTRY_QUEUE.putItem(200, new DistributionQueueItem(PACKAGE_ID_2, new HashMap<String, Object>(){{ - put(RECORD_TOPIC, TOPIC); - put(RECORD_PARTITION, PARTITION); - put(RECORD_OFFSET, 200); - put(RECORD_TIMESTAMP, 1541538150584L); - }})); - - THREE_ENTRY_QUEUE.putItem(300, new DistributionQueueItem(PACKAGE_ID_3, new HashMap<String, Object>(){{ - put(RECORD_TOPIC, TOPIC); - put(RECORD_PARTITION, PARTITION); - put(RECORD_OFFSET, 300); - put(RECORD_TIMESTAMP, 1541538150586L); - }})); + private static final String PACKAGE_ID_PREFIX = "package-"; + private final Logger log = LoggerFactory.getLogger(this.getClass()); + private Semaphore invoked = new Semaphore(0); + private long lastClearOffset = 0l; + private OffsetQueue<DistributionQueueItem> offsetQueue; + private PubQueue queue; + + @Before + public void before () { + offsetQueue = new OffsetQueueImpl<>(); + queue = pubQueue(offsetQueue); + addEntries(); } @Test public void testGetName() throws Exception { - assertEquals(QUEUE_NAME, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getName()); + assertEquals(QUEUE_NAME, queue.getName()); } @Test(expected = UnsupportedOperationException.class) public void testAdd() throws Exception { - DistributionQueueItem queueItem = new DistributionQueueItem(PACKAGE_ID_1, emptyMap()); - new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).add(queueItem); + queue.add(queueItem(1)); + } + + @Test + public void testGetHeadEmpty() throws Exception { + assertNull(queue.getHead()); } @Test public void testGetHead() throws Exception { - assertNull(pubQueue(EMPTY_QUEUE).getHead()); - DistributionQueueEntry headEntry = pubQueue(THREE_ENTRY_QUEUE).getHead(); + addEntries(); + + DistributionQueueEntry headEntry = queue.getHead(); + assertNotNull(headEntry); - assertEquals(PACKAGE_ID_1, headEntry.getItem().getPackageId()); + assertEquals(packageId(1), headEntry.getItem().getPackageId()); } @Test public void testGetItems() throws Exception { - Iterator<DistributionQueueEntry> entries = pubQueue(THREE_ENTRY_QUEUE).getEntries(1, 2).iterator(); + addEntries(); + + Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 2).iterator(); + assertNotNull(entries); DistributionQueueEntry entry1 = entries.next(); assertNotNull(entry1); - assertEquals(PACKAGE_ID_2, entry1.getItem().getPackageId()); + assertEquals(packageId(2), entry1.getItem().getPackageId()); DistributionQueueEntry entry2 = entries.next(); - assertEquals(PACKAGE_ID_3, entry2.getItem().getPackageId()); + assertEquals(packageId(3), entry2.getItem().getPackageId()); } @Test public void testGetItem() throws Exception { + addEntries(); + String entryId = TOPIC + "-" + PARTITION + "@" + 200; - DistributionQueueEntry queueEntry = pubQueue(THREE_ENTRY_QUEUE).getEntry(entryId); + DistributionQueueEntry queueEntry = queue.getEntry(entryId); + assertNotNull(queueEntry); - assertEquals(PACKAGE_ID_2, queueEntry.getItem().getPackageId()); + assertEquals(packageId(2), queueEntry.getItem().getPackageId()); } @Test public void testRemoveHead() throws Exception { - final Semaphore invoked = new Semaphore(1); - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release()); - String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem()); + addEntries(); + + String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem()); DistributionQueueEntry removed = queue.remove(headEntryId); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); + + assertClearCallbackInvoked(); assertEquals(headEntryId, removed.getId()); } @Test(expected = UnsupportedOperationException.class) public void testRemoveRandomItemFails() throws Exception { - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE); - String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200)); + addEntries(); + + String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200)); queue.remove(randomEntryId); } @Test public void testRemoveSetOfRandomItemsWillClear() throws Exception { - final Semaphore invoked = new Semaphore(2); - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release()); - String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem()); - String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200)); + addEntries(); + String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem()); + String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(offset(2))); Iterator<DistributionQueueEntry> removed = queue.remove(Collections.singleton(randomEntryId)).iterator(); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); + + assertClearCallbackInvoked(); assertEquals(headEntryId, removed.next().getId()); assertEquals(randomEntryId, removed.next().getId()); assertFalse(removed.hasNext()); @@ -153,47 +149,79 @@ public class PubQueueTest { @Test public void testRemoveSetOfNonExistingItem() throws Exception { - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE); - + addEntries(); + Iterable<DistributionQueueEntry> removed = queue.remove(Collections.singleton("nonexisting-0@99999")); + assertFalse(removed.iterator().hasNext()); } @Test public void testClearAll() throws Exception { - final Semaphore invoked = new Semaphore(3); - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release()); + addEntries(); Iterable<DistributionQueueEntry> removed = queue.clear(-1); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertEquals(3, StreamSupport.stream(removed.spliterator(), false).count()); + + assertClearCallbackInvoked(); + assertEquals(3, streamOf(removed).count()); + assertEquals(offset(3), lastClearOffset); } @Test public void testClearPartial() throws Exception { - final Semaphore invoked = new Semaphore(2); - DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release()); - + addEntries(); + Iterable<DistributionQueueEntry> removed = queue.clear(2); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); - assertEquals(2, StreamSupport.stream(removed.spliterator(), false).count()); + + assertClearCallbackInvoked(); + assertEquals(2, streamOf(removed).count()); + assertEquals(offset(2), lastClearOffset); } @Test public void testGetType() throws Exception { - assertEquals(DistributionQueueType.ORDERED, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getType()); + assertEquals(DistributionQueueType.ORDERED, queue.getType()); } - private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) { - return pubQueue(offsetQueue, NO_OP); + private void assertClearCallbackInvoked() throws InterruptedException { + assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS)); } - private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue, ClearCallback clearCallback) { - return new PubQueue(QUEUE_NAME, offsetQueue, 0, clearCallback); + private void addEntries() { + offsetQueue.putItem(offset(1), queueItem(1)); + offsetQueue.putItem(offset(2), queueItem(2)); + offsetQueue.putItem(offset(3), queueItem(3)); } + private DistributionQueueItem queueItem(int nr) { + HashMap<String, Object> data = new HashMap<String, Object>(){{ + put(RECORD_TOPIC, TOPIC); + put(RECORD_PARTITION, PARTITION); + put(RECORD_OFFSET, offset(nr)); + put(RECORD_TIMESTAMP, 1541538150580L + nr * 2); + }}; + return new DistributionQueueItem(packageId(nr), data); + } + private long offset(int nr) { + return nr * 100; + }; + + private static String packageId(int nr) { + return PACKAGE_ID_PREFIX + new Integer(nr).toString(); + } + + private Stream<DistributionQueueEntry> streamOf(Iterable<DistributionQueueEntry> entries) { + return StreamSupport.stream(entries.spliterator(), false); + } + + private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) { + return new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback); + } + + private void clearCallback(long offset) { + log.info("Clearcallback with offset {}", offset); + lastClearOffset = offset; + invoked.release(); + } } \ No newline at end of file