[ 
https://issues.apache.org/jira/browse/SAMZA-142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13904385#comment-13904385
 ] 

Chris Riccomini commented on SAMZA-142:
---------------------------------------

Agree in terminology.

In the code on SAMZA-147, I'm using "oldest", "newest", and "future".

> Kafka changelog starts from latest offset instead of earliest
> -------------------------------------------------------------
>
>                 Key: SAMZA-142
>                 URL: https://issues.apache.org/jira/browse/SAMZA-142
>             Project: Samza
>          Issue Type: Bug
>          Components: container, kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>
> SamzaContainer and TaskStorageManager improperly begin reading changelog 
> streams from the "latest" offset in the default case, when using a 
> KafkaSystemFactory for the changelog stream.
> In the logs you can see:
> {noformat}
> 2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams: 
> Map(realtime-state-store -> SystemStream [system=kafka, 
> stream=realtime-state-store])
> ...
> 2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
> ...
> 2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host 
> localhost:10251 for system kafka
> 2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined 
> for topic realtime-state-store
> 2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
> 2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic 
> and Partition [realtime-state-store,96] = 3358324
> {noformat}
> This is wrong, as it means we never read any messages from the changelog. In 
> Kafka's case (and in general), we want to start reading from the EARLIEST 
> offset in the log (always zero for Kafka). This is caused by 
> TaskStorageManager's startConsumers method, which calls register with a null 
> offset. With a KafkaSystemConsumer, this means to defer to the 
> auto.offset.reset setting, which defaults to "largest" if it isn't defined.
> Some possible ways that I can think of to fix this:
> 1. Introduce some kind of EARLIEST/LATEST constants for 
> SystemConsumer.register, have TaskStorageManager use it, and 
> KafkaSystemConsumer honor it.
> 2. Default KafkaSystemFactory.getConsumer to force all changelog stream 
> auto-offset-resets to be EARLIEST when not defined.
> 3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
> 4. Add some kind of alternative "getChangelogConsumer" method to 
> SystemConsumer.
> 5. Change the meaning of SystemConsumer.register(..., null) to always mean 
> "earliest". Right now it means "defer to the consumer to decide what an 
> appropriate offset is, since I don't have one."
> There are many more tweaks we could make as well. In general, I think the 
> question is, do we want to fix this at the Kafka-level, or at the 
> framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are 
> framework-wide, and would apply to all SystemConsumers.
> I think I'd rather fix this at the framework level. Right now, I think 
> something like (1) seems like the best solution. It does add yet another 
> constraint for SystemConsumer implementations, though, which is annoying.
> What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET = 
> "__samza_EARLIEST" constant, and using that in 
> TaskStorageManager.startConsumers? SystemConsumers that don't honor this 
> constant couldn't be used as changelog streams.
> In the meantime, as a workaround, you can forcibly configure the 
> auto.offset.reset setting to be smallest for all changelog streams.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to