Hey all,
Evaluating Samza currently and am running into some odd issues.
I'm currently working off the 'hello-samza' repo and trying to parse a
simple kafka topic that I've produced through an extenal java app (nothing
other than a series of sentences) and it's failing pretty hard for me. The
base 'hello-samza' set of apps works fine, but as soon as I change the
configuration to look at a different Kafka/zookeeper I get the following in
the userlogs:
2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last offsets
for streams [myTopic] due to kafka.common.KafkaException: fetching topic
metadata for topics [Set(myTopic)] from broker
[ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.
The modifications are pretty straightforward. In the
Wikipedia-parser.properties, I've changed the following:
task.inputs=kafka.myTopic
systems.kafka.consumer.zookeeper.connect=redacted:2181/
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.metadata.broker.list=redacted:9092
and in the actual java file WikipediaParserStreamTask.java
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>)
envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
try {
System.out.println(event.getRawEvent());
And then following the compile/extract/run process outlined in the
hello-samza website.
Any thoughts? I've looked online for any 'super simple' examples of
ingesting kafka in samza with very little success.