Chris Riccomini created SAMZA-142:
-------------------------------------

             Summary: Kafka changelog starts from latest offset instead of 
earliest
                 Key: SAMZA-142
                 URL: https://issues.apache.org/jira/browse/SAMZA-142
             Project: Samza
          Issue Type: Bug
          Components: container, kv
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini


SamzaContainer and TaskStorageManager improperly begin reading changelog 
streams from the "latest" offset in the default case, when using a 
KafkaSystemFactory for the changelog stream.

In the logs you can see:

{noformat}
2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams: 
Map(realtime-state-store -> SystemStream [system=kafka, 
stream=realtime-state-store])
...
2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
...
2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host 
localhost:10251 for system kafka
2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined 
for topic realtime-state-store
2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic and 
Partition [realtime-state-store,96] = 3358324
{noformat}

This is wrong, as it means we never read any messages from the changelog. In 
Kafka's case (and in general), we want to start reading from the EARLIEST 
offset in the log (always zero for Kafka). This is caused by 
TaskStorageManager's startConsumers method, which calls register with a null 
offset. With a KafkaSystemConsumer, this means to defer to the 
auto.offset.reset setting, which defaults to "largest" if it isn't defined.

Some possible ways that I can think of to fix this:

1. Introduce some kind of EARLIEST/LATEST constants for 
SystemConsumer.register, have TaskStorageManager use it, and 
KafkaSystemConsumer honor it.
2. Default KafkaSystemFactory.getConsumer to force all changelog stream 
auto-offset-resets to be EARLIEST when not defined.
3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
4. Add some kind of alternative "getChangelogConsumer" method to SystemConsumer.
5. Change the meaning of SystemConsumer.register(..., null) to always mean 
"earliest". Right now it means "defer to the consumer to decide what an 
appropriate offset is, since I don't have one."

There are many more tweaks we could make as well. In general, I think the 
question is, do we want to fix this at the Kafka-level, or at the 
framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are 
framework-wide, and would apply to all SystemConsumers.

I think I'd rather fix this at the framework level. Right now, I think 
something like (1) seems like the best solution. It does add yet another 
constraint for SystemConsumer implementations, though, which is annoying.

What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET = 
"__samza_EARLIEST" constant, and using that in 
TaskStorageManager.startConsumers? SystemConsumers that don't honor this 
constant couldn't be used as changelog streams.

In the meantime, as a workaround, you can forcibly configure the 
auto.offset.reset setting to be smallest for all changelog streams.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to