[ 
https://issues.apache.org/jira/browse/SAMZA-142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13898322#comment-13898322
 ] 

Chris Riccomini commented on SAMZA-142:
---------------------------------------

Thinking about this a bit more. Another option that comes to mind is to add a 
getEarlistedOffset method to the SystemAdmin interface:

{code}
Map<SystemStreamPartition, String> getEarliestOffset(Set<String> streams);
{code}

We already have SystemAdmin.getLatestOffset method, which is used for the 
DefaultChooser to get offsets for bootstrap streams. The "earliest offset for 
changelog stream" use case is actually almost identical to this, and I think 
this pattern works really well.

Once we have getEarliestOffset, we can have TaskStorageManager take a 
systemAdmins variable, which can be used to get the earliest offset for each 
changelog stream. We can then feed that in to SystemConsumer.register(), rather 
than the null we give it now.

> Kafka changelog starts from latest offset instead of earliest
> -------------------------------------------------------------
>
>                 Key: SAMZA-142
>                 URL: https://issues.apache.org/jira/browse/SAMZA-142
>             Project: Samza
>          Issue Type: Bug
>          Components: container, kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>
> SamzaContainer and TaskStorageManager improperly begin reading changelog 
> streams from the "latest" offset in the default case, when using a 
> KafkaSystemFactory for the changelog stream.
> In the logs you can see:
> {noformat}
> 2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams: 
> Map(realtime-state-store -> SystemStream [system=kafka, 
> stream=realtime-state-store])
> ...
> 2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
> ...
> 2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host 
> localhost:10251 for system kafka
> 2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined 
> for topic realtime-state-store
> 2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
> 2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic 
> and Partition [realtime-state-store,96] = 3358324
> {noformat}
> This is wrong, as it means we never read any messages from the changelog. In 
> Kafka's case (and in general), we want to start reading from the EARLIEST 
> offset in the log (always zero for Kafka). This is caused by 
> TaskStorageManager's startConsumers method, which calls register with a null 
> offset. With a KafkaSystemConsumer, this means to defer to the 
> auto.offset.reset setting, which defaults to "largest" if it isn't defined.
> Some possible ways that I can think of to fix this:
> 1. Introduce some kind of EARLIEST/LATEST constants for 
> SystemConsumer.register, have TaskStorageManager use it, and 
> KafkaSystemConsumer honor it.
> 2. Default KafkaSystemFactory.getConsumer to force all changelog stream 
> auto-offset-resets to be EARLIEST when not defined.
> 3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
> 4. Add some kind of alternative "getChangelogConsumer" method to 
> SystemConsumer.
> 5. Change the meaning of SystemConsumer.register(..., null) to always mean 
> "earliest". Right now it means "defer to the consumer to decide what an 
> appropriate offset is, since I don't have one."
> There are many more tweaks we could make as well. In general, I think the 
> question is, do we want to fix this at the Kafka-level, or at the 
> framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are 
> framework-wide, and would apply to all SystemConsumers.
> I think I'd rather fix this at the framework level. Right now, I think 
> something like (1) seems like the best solution. It does add yet another 
> constraint for SystemConsumer implementations, though, which is annoying.
> What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET = 
> "__samza_EARLIEST" constant, and using that in 
> TaskStorageManager.startConsumers? SystemConsumers that don't honor this 
> constant couldn't be used as changelog streams.
> In the meantime, as a workaround, you can forcibly configure the 
> auto.offset.reset setting to be smallest for all changelog streams.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to