Hi, Shadi: Thans a lot for your reply. 1. There is no error log at Kafka and Samza
2. this line " logger.info("key="+key+": message="+message); " write log correctly as below: [image: Inline image 1] This are my last two message with right count 3. I tried both way below, none of them create topic, but I will try it again. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); 4. I wrote a topic call "http-demo" to Kafka as my input, and the content can be show with command line below, so the Kafka should be OK. deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic http-demo Your help is highly appreciated. Sincerely, Selina On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < snogh...@linkedin.com.invalid> wrote: > Selina, > > You should probably check a few things > 1. Your log files to see if you have any errors. Also, does you job fail or > continues running? > 2. Does this line " logger.info("key="+key+": message="+message); " write > any logs? > 3. This might not be the only reason, but you are sending messages of > type Map<String, > String>. However, in your config file, you defined " > systems.kafka.samza.msg.serde=string" which expects the message to be a > String. > > > Shadi > > > On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, All > > > > I am trying to write my first StreamTask class. I have a topic at > > Kafka called "http-demo". I like to read the topic and write it to > another > > topic called "demo-duplicate" > > > > Howeven there is not topic written to Kafka. > > > > My properties file and StreamTask are below. Can anyone told me what > > is the bug? > > BTW, if I set checkpoint or Metrics at properties file. the topic of > > checkpoint and metrics could be written to Kafka. And the content of > > input topic -- http-demo could be show correctly. > > > > Your help is highly appreciated. > > > > Sincerely, > > Selina > > > > > > - - -- - - - - - > > # Job > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=demo-parser > > > > # YARN > > > > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > > > # Task > > task.class=samza.http.demo.task.HttpDemoParserStreamTask > > task.inputs=kafka.http-demo > > > > # Serializers > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > # Kafka System > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.msg.serde=string > > systems.kafka.samza.key.serde=string > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > systems.kafka.consumer.auto.offset.reset=largest > > systems.kafka.producer.bootstrap.servers=localhost:9092 > > - - -- - - - - - > > > > My StreamTask class is simple also > > > > --------- > > > > /** > > * > > * Read data from http-demo topic and write it back to "demo-duplicate" > > */ > > public class HttpDemoParserStreamTask implements StreamTask { > > > > private static final SystemStream OUTPUT_STREAM = new > > SystemStream("kafka", "demo-duplicate"); > > Logger logger = > > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); > > > > @SuppressWarnings("unchecked") > > @Override > > public void process(IncomingMessageEnvelope envelope, > MessageCollector > > collector, TaskCoordinator coordinator) throws Exception { > > > > String key = (String) envelope.getKey(); > > String message = envelope.getMessage().toString(); > > logger.info("key="+key+": message="+message); > > > > Map<String, String> outgoingMap = (Map<String, String>) > > (envelope.getMessage()); > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > outgoingMap)); > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > message)); > > } > > > > } > > > > ------- > > >