[
https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13924369#comment-13924369
]
Chris Riccomini commented on SAMZA-157:
---------------------------------------
Feedback received from [~jghoman] was that
IncomingMessageEnvelope.getNextOffset seemed a bit strange. Also concerned
about committing to a new method in the public-facing API.
Alternatives I see are:
1. SystemAdmin.getNextOffset(Map<SystemStreamPartition, String> currentOffsets)
2. IncomingMessageEnvelope.getNextOffset
3. IncomingMessageEnvelope.getSystemStreamPartitionMetadata
4. Write a ContainerMessageEnvelope extends IncomingMessageEnvelope, and has
.getNextOffset
(1) is how I originally implemented things.
(2) is how things are currently implemented.
(3) is basically a variation on (2), but provides a bit more information, and
seems a bit less hacky.
(4) also seems kind of hacky, but it is how the ReadableCollector,
ReadableMetricsRegistery, and ReadableCoordinator work already. There's an
argument to be made to follow this style.
The problem with (3) is that it contains oldest offset, which changes over
time. In Kafka's case, since the oldest offset is not part of the message, we'd
have to make a call to the Kafka broker every time a message arrived in order
to find out the "current oldest offset". This is prohibitively slow.
Alternatively, we could only refresh the oldest offset periodically, and give
inaccurate offset information sometimes. Neither of these seem ideal.
There is no real problem with (2) that I can tell, except for the problems that
Jakob raised.
(1) also works well. In this scenario, you continue checkpointing current
offsets, and simply use the getNextOffset API when you are restoring
checkpoints. The reason that I ditched it in favor of (2) is that (1) felt a
bit hacky. Having to implement a specific system admin method just so we can do
checkpoints felt strange.
The advantage that (2) has over (1) is that it removes the API change from the
StreamTask's IncomingMessageEnvelope. I think we've arrived at the conclusion
that, since we don't know exactly what the API should look like, it's best not
to expose it to the end user. For that reason, we're going to eliminate (2) as
an option.
This leaves (1) and (4).
> Offset default behavior for streams
> -----------------------------------
>
> Key: SAMZA-157
> URL: https://issues.apache.org/jira/browse/SAMZA-157
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Attachments: SAMZA-157.0.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default
> configuration, which specifies what to do when no checkpoint exists for an
> input topic. This is a similar to setting to Kafka's auto.offset.reset
> setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream
> partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream
> name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I
> think is a safe assumption.
> This setting would force the system consumer to be registered with the
> specified offset for the given SSP (ignoring both the checkpoint, if it
> exists, and the samza.reset.offset setting).
--
This message was sent by Atlassian JIRA
(v6.2#6252)