[ 
https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13924369#comment-13924369
 ] 

Chris Riccomini commented on SAMZA-157:
---------------------------------------

Feedback received from [~jghoman] was that 
IncomingMessageEnvelope.getNextOffset seemed a bit strange. Also concerned 
about committing to a new method in the public-facing API.

Alternatives I see are:

1. SystemAdmin.getNextOffset(Map<SystemStreamPartition, String> currentOffsets)
2. IncomingMessageEnvelope.getNextOffset
3. IncomingMessageEnvelope.getSystemStreamPartitionMetadata
4. Write a ContainerMessageEnvelope extends IncomingMessageEnvelope, and has 
.getNextOffset

(1) is how I originally implemented things.

(2) is how things are currently implemented.

(3) is basically a variation on (2), but provides a bit more information, and 
seems a bit less hacky.

(4) also seems kind of hacky, but it is how the ReadableCollector, 
ReadableMetricsRegistery, and ReadableCoordinator work already. There's an 
argument to be made to follow this style.

The problem with (3) is that it contains oldest offset, which changes over 
time. In Kafka's case, since the oldest offset is not part of the message, we'd 
have to make a call to the Kafka broker every time a message arrived in order 
to find out the "current oldest offset". This is prohibitively slow. 
Alternatively, we could only refresh the oldest offset periodically, and give 
inaccurate offset information sometimes. Neither of these seem ideal.

There is no real problem with (2) that I can tell, except for the problems that 
Jakob raised.

(1) also works well. In this scenario, you continue checkpointing current 
offsets, and simply use the getNextOffset API when you are restoring 
checkpoints. The reason that I ditched it in favor of (2) is that (1) felt a 
bit hacky. Having to implement a specific system admin method just so we can do 
checkpoints felt strange.

The advantage that (2) has over (1) is that it removes the API change from the 
StreamTask's IncomingMessageEnvelope. I think we've arrived at the conclusion 
that, since we don't know exactly what the API should look like, it's best not 
to expose it to the end user. For that reason, we're going to eliminate (2) as 
an option.

This leaves (1) and (4).

> Offset default behavior for streams
> -----------------------------------
>
>                 Key: SAMZA-157
>                 URL: https://issues.apache.org/jira/browse/SAMZA-157
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>         Attachments: SAMZA-157.0.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default 
> configuration, which specifies what to do when no checkpoint exists for an 
> input topic. This is a similar to setting to Kafka's auto.offset.reset 
> setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream 
> partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream 
> name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I 
> think is a safe assumption.
> This setting would force the system consumer to be registered with the 
> specified offset for the given SSP (ignoring both the checkpoint, if it 
> exists, and the samza.reset.offset setting).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to