[ 
https://issues.apache.org/jira/browse/SAMZA-142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13911783#comment-13911783
 ] 

Chris Riccomini commented on SAMZA-142:
---------------------------------------

As part of (3), we should also add the ability to override offsets for specific 
stream partitions. Something like:

{noformat}
systems.<system name>.streams.<stream 
name>.samza.force.offsets=0:123,1:123,2:123
{noformat}

The format I'm proposing is:

{noformat}
<partition string>:<force offset>,...
{noformat}

This is obviously dependent on offsets not having : or , in them, which I think 
is a safe assumption.

This setting would force the system consumer to be registered with the 
specified offset for the given SSP (ignoring both the checkpoint, if it exists, 
and the samza.reset.offset setting).

> Kafka changelog starts from latest offset instead of earliest
> -------------------------------------------------------------
>
>                 Key: SAMZA-142
>                 URL: https://issues.apache.org/jira/browse/SAMZA-142
>             Project: Samza
>          Issue Type: Bug
>          Components: container, kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>
> SamzaContainer and TaskStorageManager improperly begin reading changelog 
> streams from the "latest" offset in the default case, when using a 
> KafkaSystemFactory for the changelog stream.
> In the logs you can see:
> {noformat}
> 2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams: 
> Map(realtime-state-store -> SystemStream [system=kafka, 
> stream=realtime-state-store])
> ...
> 2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
> ...
> 2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host 
> localhost:10251 for system kafka
> 2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined 
> for topic realtime-state-store
> 2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
> 2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic 
> and Partition [realtime-state-store,96] = 3358324
> {noformat}
> This is wrong, as it means we never read any messages from the changelog. In 
> Kafka's case (and in general), we want to start reading from the EARLIEST 
> offset in the log (always zero for Kafka). This is caused by 
> TaskStorageManager's startConsumers method, which calls register with a null 
> offset. With a KafkaSystemConsumer, this means to defer to the 
> auto.offset.reset setting, which defaults to "largest" if it isn't defined.
> Some possible ways that I can think of to fix this:
> 1. Introduce some kind of EARLIEST/LATEST constants for 
> SystemConsumer.register, have TaskStorageManager use it, and 
> KafkaSystemConsumer honor it.
> 2. Default KafkaSystemFactory.getConsumer to force all changelog stream 
> auto-offset-resets to be EARLIEST when not defined.
> 3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
> 4. Add some kind of alternative "getChangelogConsumer" method to 
> SystemConsumer.
> 5. Change the meaning of SystemConsumer.register(..., null) to always mean 
> "earliest". Right now it means "defer to the consumer to decide what an 
> appropriate offset is, since I don't have one."
> There are many more tweaks we could make as well. In general, I think the 
> question is, do we want to fix this at the Kafka-level, or at the 
> framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are 
> framework-wide, and would apply to all SystemConsumers.
> I think I'd rather fix this at the framework level. Right now, I think 
> something like (1) seems like the best solution. It does add yet another 
> constraint for SystemConsumer implementations, though, which is annoying.
> What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET = 
> "__samza_EARLIEST" constant, and using that in 
> TaskStorageManager.startConsumers? SystemConsumers that don't honor this 
> constant couldn't be used as changelog streams.
> In the meantime, as a workaround, you can forcibly configure the 
> auto.offset.reset setting to be smallest for all changelog streams.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to