shanthoosh commented on a change in pull request #1030: SAMZA-2192: Add
StartpointVisitor implementation for EventHub.
URL: https://github.com/apache/samza/pull/1030#discussion_r284455548
##########
File path:
samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
##########
@@ -205,4 +234,75 @@ public static Integer compareOffsets(String offset1,
String offset2) {
public Integer offsetComparator(String offset1, String offset2) {
return compareOffsets(offset1, offset2);
}
+
+ /**
+ * Offers a eventhub specific implementation of {@link StartpointVisitor}
that resolves
+ * different types of {@link Startpoint} to samza offset.
+ */
+ @VisibleForTesting
+ static class EventHubSamzaOffsetResolver implements
StartpointVisitor<SystemStreamPartition, String> {
+
+ private final EventHubSystemAdmin eventHubSystemAdmin;
+ private final EventHubConfig eventHubConfig;
+
+ EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin,
EventHubConfig eventHubConfig) {
+ this.eventHubSystemAdmin = eventHubSystemAdmin;
+ this.eventHubConfig = eventHubConfig;
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition,
StartpointSpecific startpointSpecific) {
+ return startpointSpecific.getSpecificOffset();
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition,
StartpointTimestamp startpointTimestamp) {
+ String streamName = systemStreamPartition.getStream();
+ EventHubClientManager eventHubClientManager =
eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
+ EventHubClient eventHubClient =
eventHubClientManager.getEventHubClient();
+
+ PartitionReceiver partitionReceiver = null;
+ try {
+ // 1. Initialize the arguments required for creating the partition
receiver.
+ String partitionId =
String.valueOf(systemStreamPartition.getPartition().getPartitionId());
+ Instant epochInMillisInstant =
Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
+ EventPosition eventPosition =
EventPosition.fromEnqueuedTime(epochInMillisInstant);
+ String consumerGroup =
eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(),
streamName);
+
+ // 2. Create a partition receiver with event position defined by the
timestamp.
+ partitionReceiver = eventHubClient.createReceiverSync(consumerGroup,
partitionId, eventPosition);
+
+ // 3. Read a single message from the partition receiver.
+ Iterable<EventData> eventHubMessagesIterator =
partitionReceiver.receiveSync(1);
+ ArrayList<EventData> eventHubMessageList =
Lists.newArrayList(eventHubMessagesIterator);
+
+ // 4. Validate that a single message was fetched from the broker.
+ Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to
read messages from EventHub system.");
Review comment:
1. My initial idea was to fall back to either oldest or newest offset upon
startpoint resolution failure. However, depending upon the timestamp that the
user wants to seek to for a partition doing either of them could be incorrect.
Let's say user wants to seek to a timestamp closer to beginning-offset and
offset for it doesn't exist. If we default it to newest, then it will be
diametrically opposite to what the user desires(or vice-versa).
2. Currently in case of startpoint resolution failure, we fall back to the
checkpointed-offset in `OffsetManager` essentially ignoring the startpoint.
3. Ideal solution should allow the user to define the default.offset policy
in case of startpoint resolution failures and use it in the `OffsetManager`.
But doing it before hand, until we have concrete use-cases or asks might be
unfruitful.
What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services