[
https://issues.apache.org/jira/browse/SAMZA-142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13904382#comment-13904382
]
Jakob Homan commented on SAMZA-142:
-----------------------------------
+1 on the above. It would be good to come up with a consistent terminology and
make sure we stick to it every where (oldest v earliest, latest v caught up v
newest).
> Kafka changelog starts from latest offset instead of earliest
> -------------------------------------------------------------
>
> Key: SAMZA-142
> URL: https://issues.apache.org/jira/browse/SAMZA-142
> Project: Samza
> Issue Type: Bug
> Components: container, kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
>
> SamzaContainer and TaskStorageManager improperly begin reading changelog
> streams from the "latest" offset in the default case, when using a
> KafkaSystemFactory for the changelog stream.
> In the logs you can see:
> {noformat}
> 2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams:
> Map(realtime-state-store -> SystemStream [system=kafka,
> stream=realtime-state-store])
> ...
> 2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
> ...
> 2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host
> localhost:10251 for system kafka
> 2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined
> for topic realtime-state-store
> 2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
> 2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic
> and Partition [realtime-state-store,96] = 3358324
> {noformat}
> This is wrong, as it means we never read any messages from the changelog. In
> Kafka's case (and in general), we want to start reading from the EARLIEST
> offset in the log (always zero for Kafka). This is caused by
> TaskStorageManager's startConsumers method, which calls register with a null
> offset. With a KafkaSystemConsumer, this means to defer to the
> auto.offset.reset setting, which defaults to "largest" if it isn't defined.
> Some possible ways that I can think of to fix this:
> 1. Introduce some kind of EARLIEST/LATEST constants for
> SystemConsumer.register, have TaskStorageManager use it, and
> KafkaSystemConsumer honor it.
> 2. Default KafkaSystemFactory.getConsumer to force all changelog stream
> auto-offset-resets to be EARLIEST when not defined.
> 3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
> 4. Add some kind of alternative "getChangelogConsumer" method to
> SystemConsumer.
> 5. Change the meaning of SystemConsumer.register(..., null) to always mean
> "earliest". Right now it means "defer to the consumer to decide what an
> appropriate offset is, since I don't have one."
> There are many more tweaks we could make as well. In general, I think the
> question is, do we want to fix this at the Kafka-level, or at the
> framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are
> framework-wide, and would apply to all SystemConsumers.
> I think I'd rather fix this at the framework level. Right now, I think
> something like (1) seems like the best solution. It does add yet another
> constraint for SystemConsumer implementations, though, which is annoying.
> What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET =
> "__samza_EARLIEST" constant, and using that in
> TaskStorageManager.startConsumers? SystemConsumers that don't honor this
> constant couldn't be used as changelog streams.
> In the meantime, as a workaround, you can forcibly configure the
> auto.offset.reset setting to be smallest for all changelog streams.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)