Chris Riccomini created SAMZA-142:
-------------------------------------
Summary: Kafka changelog starts from latest offset instead of
earliest
Key: SAMZA-142
URL: https://issues.apache.org/jira/browse/SAMZA-142
Project: Samza
Issue Type: Bug
Components: container, kv
Affects Versions: 0.6.0
Reporter: Chris Riccomini
SamzaContainer and TaskStorageManager improperly begin reading changelog
streams from the "latest" offset in the default case, when using a
KafkaSystemFactory for the changelog stream.
In the logs you can see:
{noformat}
2014-02-11 02:38:33 SamzaContainer$ [INFO] Got change log system streams:
Map(realtime-state-store -> SystemStream [system=kafka,
stream=realtime-state-store])
...
2014-02-11 02:38:36 SamzaContainer [INFO] Starting task instance stores.
...
2014-02-11 02:38:36 BrokerProxy [INFO] Creating new SimpleConsumer for host
localhost:10251 for system kafka
2014-02-11 02:38:36 GetOffset [INFO] Checking if auto.offset.reset is defined
for topic realtime-state-store
2014-02-11 02:38:36 GetOffset [INFO] Got reset of type largest.
2014-02-11 02:38:36 GetOffset [INFO] Final offset to be returned for Topic and
Partition [realtime-state-store,96] = 3358324
{noformat}
This is wrong, as it means we never read any messages from the changelog. In
Kafka's case (and in general), we want to start reading from the EARLIEST
offset in the log (always zero for Kafka). This is caused by
TaskStorageManager's startConsumers method, which calls register with a null
offset. With a KafkaSystemConsumer, this means to defer to the
auto.offset.reset setting, which defaults to "largest" if it isn't defined.
Some possible ways that I can think of to fix this:
1. Introduce some kind of EARLIEST/LATEST constants for
SystemConsumer.register, have TaskStorageManager use it, and
KafkaSystemConsumer honor it.
2. Default KafkaSystemFactory.getConsumer to force all changelog stream
auto-offset-resets to be EARLIEST when not defined.
3. Change KafkaSystemFactory auto-offset-reset to EARLIEST.
4. Add some kind of alternative "getChangelogConsumer" method to SystemConsumer.
5. Change the meaning of SystemConsumer.register(..., null) to always mean
"earliest". Right now it means "defer to the consumer to decide what an
appropriate offset is, since I don't have one."
There are many more tweaks we could make as well. In general, I think the
question is, do we want to fix this at the Kafka-level, or at the
framework-wide level. (2) and (3) are Kafka-specific. (1), (4), and (5) are
framework-wide, and would apply to all SystemConsumers.
I think I'd rather fix this at the framework level. Right now, I think
something like (1) seems like the best solution. It does add yet another
constraint for SystemConsumer implementations, though, which is annoying.
What do you think about having a SystemConsumer.START_FROM_EARLIEST_OFFSET =
"__samza_EARLIEST" constant, and using that in
TaskStorageManager.startConsumers? SystemConsumers that don't honor this
constant couldn't be used as changelog streams.
In the meantime, as a workaround, you can forcibly configure the
auto.offset.reset setting to be smallest for all changelog streams.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)