Hey Claudio,

The logs show that your input topic is metrics-topic. The container is
reading this topic from the upcoming offset, not the oldest:

2014-04-22 19:46:47 KafkaSystemAdmin$ [INFO] Got metadata:
Map(metrics-topic -> SystemStreamMetadata [streamName=metrics-topic,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=0, newestOffset=1704, upcomingOffset=1705], Partition
[partition=1]=SystemStreamPartitionMetadata [oldestOffset=0,
newestOffset=1602, upcomingOffset=1603]}])

...
2014-04-22 19:47:20 GetOffset [INFO] Validating offset 1705 for topic and
partition [metrics-topic,0]
2014-04-22 19:47:20 GetOffset [INFO] Able to successfully read from offset
1705 for topic and partition [metrics-topic,0]. Using it to instantiate
consumer.
2014-04-22 19:47:20 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2014-04-22 19:47:20 GetOffset [INFO] Validating offset 1603 for topic and
partition [metrics-topic,1]
2014-04-22 19:47:20 GetOffset [INFO] Able to successfully read from offset
1603 for topic and partition [metrics-topic,1]. Using it to instantiate
consumer.
...


The reason lies in this log line:


2014-04-22 19:46:47 OffsetManager$ [INFO] No default offset for
SystemStream [system=kafka, stream=metrics-topic] defined. Using upcoming.


You haven't defined a default offset for the stream (or system). You can
do this with a stream-level config:

  systems.kafka.streams.metrics-topic.samza.offset.default=oldest

Or a system-level config:

  systems.kafka.samza.offset.default=oldest


The latter (system-level config) sets the default for all streams coming
from the Kafka system, while the former (stream-level) defines the config
only for the metrics-topic input stream.

Cheers,
Chris

On 4/22/14 4:52 PM, "Claudio Martins" <[email protected]> wrote:

>Hi Chris, the logs are on this link: http://pastebin.com/WQR7d52V
>
>This topic alone had around 500k messages to process and yet the Task did
>not get anything to process when it was restarted with "smallest" offset
>config.
>
>Is there anything I am missing?
>
>- Claudio Martins
>Head of Engineering and User Experience
>MobileAware USA Inc. / www.mobileaware.com
>office: +1  617 986 5060 / mobile: +1 617 480 5288
>linkedin: www.linkedin.com/in/martinsclaudio
>
>
>On Thu, Apr 17, 2014 at 3:45 PM, Chris Riccomini
><[email protected]>wrote:
>
>> Hey Claudio,
>>
>> Hmm. Could you post your logs somewhere where we can have a look? The
>>logs
>> should say whether the checkpoint is used or not, and also provide
>>insight
>> into why they might not be.
>>
>> Cheers,
>> Chris
>>
>> On 4/17/14 11:19 AM, "Claudio Martins" <[email protected]> wrote:
>>
>> >Hi Chris, thanks for the answer.
>> >
>> >I already had those configured:
>> >
>> >task.checkpoint.factory=
>> >org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> >
>> >task.checkpoint.system=kafka
>> >
>> >task.checkpoint.replication.factor=1
>> >
>> >task.window.ms=5000
>> >
>> >task.commit.ms=5000
>> >
>> >task.checkpoint.failure.retry.ms=5000
>> >
>> >- Claudio Martins
>> >Head of Engineering and User Experience
>> >MobileAware USA Inc. / www.mobileaware.com
>> >office: +1 617 986 5060 / mobile: +1 617 480 5288
>> >linkedin: www.linkedin.com/in/martinsclaudio
>> >
>> >
>> >On Thu, Apr 17, 2014 at 2:15 PM, Chris Riccomini
>> ><[email protected]>wrote:
>> >
>> >> Hey Claudio,
>> >>
>> >> It looks like you're using the old configs from Samza 0.6.0.
>> >> Unfortunately, our docs have not yet been updated to reflect the new
>> >> configs for 0.7.0. There is a JIRA open for this, though:
>> >>
>> >>   https://issues.apache.org/jira/browse/SAMZA-165
>> >>
>> >> To enable checkpointing, you'll need to set:
>> >>
>> >>
>> 
>>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoi
>>>>nt
>> >>Ma
>> >> nagerFactory
>> >> # assume we have a system defined in the config called "kafka"
>> >> task.checkpoint.system=kafka
>> >> # can't be higher than the number of brokers in your Kafka grid,
>> >> # but should usually be more than one if you have more than one
>> >> # broker in your Kafka grid.
>> >> task.checkpoint.replication.factor=1
>> >>
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 4/16/14 3:03 PM, "Claudio Martins" <[email protected]>
>>wrote:
>> >>
>> >> >Hi guys,
>> >> >
>> >> >I'm having a hard time trying to figure out why my Samza job does
>>not
>> >> >start
>> >> >processing Kafka messages from the last checkpoint.
>> >> >
>> >> >I have the task configured as
>> >> >
>> >> >systems.kafka.consumer.auto.offset.reset=smallest
>> >> >streams.topic-name.consumer.reset.offset=false
>> >> >
>> >> >However, when the job runs it doesn't start processing anything from
>> >>the
>> >> >current offset, just the upcoming messages.
>> >> >
>> >> >I stop the job, load some messages into the topic and start the job
>> >>again.
>> >> >Nothing happens, just the new messages are processed.
>> >> >
>> >> >Is there anything I am missing here?
>> >> >
>> >> >I do not want to start processing from the beginning of the topic,
>> >>just to
>> >> >make it clear. I want to process from the last checkpoint.
>> >> >
>> >> >
>> >> >Thanks,
>> >> >
>> >> >- Claudio Martins
>> >> >linkedin: www.linkedin.com/in/martinsclaudio
>> >>
>> >>
>>
>>

Reply via email to