This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-11028 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 3d82acfe3c1fa4e6b506980fdf53080ac8d39595 Author: Christian Schneider <[email protected]> AuthorDate: Mon Dec 20 14:06:10 2021 +0100 SLING-11028 - Allow to assign to relative offset --- pom.xml | 2 +- .../journal/kafka/KafkaClientProvider.java | 47 +++++++++++++++++----- .../journal/kafka/KafkaClientProviderTest.java | 22 +++++++++- .../journal/kafka/KafkaMessageInfoTest.java | 2 +- .../distribution/journal/kafka/MessagingTest.java | 35 +++++++++++++++- 5 files changed, 95 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index bb39376..0b7e2c3 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal.messages</artifactId> - <version>0.3.0</version> + <version>0.3.1-SNAPSHOT</version> </dependency> <!-- OSGi --> <dependency> diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java index ec31888..3c16c6a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java @@ -138,7 +138,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { Collection<TopicPartition> topicPartitions = singleton(topicPartition); consumer.assign(topicPartitions); if (assign != null) { - consumer.seek(topicPartition, offset(assign)); + AssignDetails assignDetails = new AssignDetails(assign); + long offset = assignDetails.getOffset(consumer, topicPartition); + consumer.seek(topicPartition, offset); } else if (reset == Reset.earliest) { consumer.seekToBeginning(topicPartitions); } else { @@ -181,6 +183,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { public String assignTo(long offset) { return format("%s:%s", PARTITION, offset); } + + @Override + public String assignTo(Reset reset, long relativeOffset) { + return format("%s:%s:%d", PARTITION, reset.name(), relativeOffset); + } protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) { String groupId = UUID.randomUUID().toString(); @@ -236,16 +243,38 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { return config; } - private long offset(String assign) { - String[] chunks = assign.split(":"); - if (chunks.length != 2) { - throw new IllegalArgumentException(format("Illegal assign %s", assign)); - } - return Long.parseLong(chunks[1]); - } - @Override public URI getServerUri() { return serverUri; } + + static class AssignDetails { + private final Reset reset; + private final long offset; + + AssignDetails(String assign) { + String[] chunks = assign.split(":"); + if (chunks.length == 3) { + String reset = chunks[1]; + this.reset = Reset.valueOf(reset); + offset = Long.parseLong(chunks[2]); + } else if (chunks.length == 2) { + reset = null; + offset = Long.parseLong(chunks[1]); + } else { + throw new IllegalArgumentException(format("Illegal assign %s", assign)); + } + } + + long getOffset(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) { + Collection<TopicPartition> partitions = singleton(topicPartition); + if (reset == Reset.earliest) { + return consumer.beginningOffsets(partitions).get(topicPartition) + offset; + } else if (reset == Reset.latest) { + return consumer.endOffsets(partitions).get(topicPartition) + offset; + } else { + return offset; + } + } + } } diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java index 358a760..738c175 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java @@ -21,10 +21,11 @@ package org.apache.sling.distribution.journal.kafka; import static java.util.Collections.emptyMap; import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint; import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; +import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,6 +34,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessagingException; import org.apache.sling.distribution.journal.Reset; import org.junit.Assert; @@ -116,4 +118,22 @@ public class KafkaClientProviderTest { String assign = provider.assignTo(1l); assertThat(assign, equalTo("0:1")); } + + @Test + public void testAssignToRelative() throws Exception { + String assign = provider.assignTo(Reset.latest, -1l); + assertThat(assign, equalTo("0:latest:-1")); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidAssign() throws Exception { + HandlerAdapter<?> handler = Mockito.mock(HandlerAdapter.class); + provider.createPoller(TOPIC, Reset.latest, "", handler); + } + + @Test + public void testGeServerURI() throws Exception { + URI serverUri = provider.getServerUri(); + assertThat(serverUri.toString(), equalTo("localhost:9092")); + } } diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java index 097919e..a3dec00 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java @@ -19,7 +19,7 @@ package org.apache.sling.distribution.journal.kafka; import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java index 0989667..e12c420 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java @@ -18,9 +18,9 @@ */ package org.apache.sling.distribution.journal.kafka; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.Closeable; @@ -112,6 +112,39 @@ public class MessagingTest { assertReceived("Should see message as we fall back to earliest"); } } + + + @Test + public void testAssignRelativeLatest() throws Exception { + DiscoveryMessage msg = createMessage(); + MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName); + messageSender.send(msg); + + String assign1 = provider.assignTo(Reset.latest, -1); + try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) { + assertReceived("Starting from latest:-1 .. should see our message"); + } + String assign2 = provider.assignTo(Reset.latest, 0); + try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) { + assertNotReceived("Should not see message as we fall back to latest"); + } + } + + @Test + public void testAssignRelativeEarliest() throws Exception { + DiscoveryMessage msg = createMessage(); + MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName); + messageSender.send(msg); + + String assign1 = provider.assignTo(Reset.earliest, 0); + try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) { + assertReceived("Starting from latest:-1 .. should see our message"); + } + String assign2 = provider.assignTo(Reset.earliest, 1); + try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) { + assertNotReceived("Should not see message as we fall back to latest"); + } + } private DiscoveryMessage createMessage() { return DiscoveryMessage.builder()
