Repository: samza Updated Branches: refs/heads/master cfbb9c6eb -> 03410b80c
SAMZA-1914: fix out of range starting offset in EH consumer Author: Hai Lu <[email protected]> Reviewers: Srinivasulu <[email protected]> Closes #664 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/03410b80 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/03410b80 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/03410b80 Branch: refs/heads/master Commit: 03410b80c674fc5001ff02db13b269df76dd0fae Parents: cfbb9c6 Author: Hai Lu <[email protected]> Authored: Wed Sep 26 12:42:06 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Wed Sep 26 12:42:06 2018 -0700 ---------------------------------------------------------------------- .../eventhub/admin/EventHubSystemAdmin.java | 15 ++++----------- .../consumer/EventHubSystemConsumer.java | 6 ++---- .../eventhub/admin/TestEventHubSystemAdmin.java | 19 ------------------- 3 files changed, 6 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 2141ebd..27abe07 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -65,19 +65,12 @@ public class EventHubSystemAdmin implements SystemAdmin { this.eventHubClientManagerFactory = eventHubClientManagerFactory; } - private String getNextOffset(String currentOffset) { - // EventHub will return the first message AFTER the offset - // that was specified in the fetch request. - // If no such offset exists Eventhub will return an error. - return String.valueOf(Long.parseLong(currentOffset) + 1); - } - @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { - Map<SystemStreamPartition, String> results = new HashMap<>(); - - offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset))); - return results; + // In EventHubSystemConsumer#initializeEventHubsManagers, we exclude the offset that we specify. i.e. + // we will only get the message after the checkpoint offset. Hence, by returning the same offset as the + // "next" offset, we won't be reprocessing the same event. + return offsets; } // EventHubRuntimeInformation does not implement toString() http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index 454fc57..a05b5e2 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -276,13 +276,11 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { receiver = eventHubClientManager.getEventHubClient() .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now())); } else { - // If the offset is less or equal to the newest offset in the system, it can be - // used as the starting offset to receive from. EventHub will return the first - // message AFTER the offset that was specified in the fetch request. + // EventHub will return the first message AFTER the offset that was specified in the fetch request. // If no such offset exists Eventhub will return an error. receiver = eventHubClientManager.getEventHubClient() .createReceiverSync(consumerGroup, partitionId.toString(), - EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM))); + EventPosition.fromOffset(offset, /* inclusiveFlag */false)); } receiver.setPrefetchCount(prefetchCount); http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java index 8861152..e45d3f4 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java @@ -22,7 +22,6 @@ package org.apache.samza.system.eventhub.admin; import org.apache.samza.Partition; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.eventhub.EventHubSystemFactory; import org.apache.samza.system.eventhub.MockEventHubConfigFactory; import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer; @@ -31,7 +30,6 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -53,23 +51,6 @@ public class TestEventHubSystemAdmin { Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, EventHubSystemConsumer.END_OF_STREAM)); } - @Test - public void testGetNextOffset() { - EventHubSystemFactory eventHubSystemFactory = new EventHubSystemFactory(); - SystemAdmin eventHubSystemAdmin = eventHubSystemFactory.getAdmin(SYSTEM_NAME, - MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING)); - Map<SystemStreamPartition, String> offsets = new HashMap<>(); - SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(0)); - SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(2)); - offsets.put(ssp0, Integer.toString(0)); - offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM); - - Map<SystemStreamPartition, String> updatedOffsets = eventHubSystemAdmin.getOffsetsAfter(offsets); - Assert.assertEquals(offsets.size(), updatedOffsets.size()); - Assert.assertEquals("1", updatedOffsets.get(ssp0)); - Assert.assertEquals("0", updatedOffsets.get(ssp2)); - } - @Ignore("Integration Test") @Test public void testGetStreamMetadata() {
