Hi, Shadi:

      Thans a lot for your reply.
1. There is no error log at Kafka and Samza

2.  this line "  logger.info("key="+key+": message="+message); " write log
correctly as below:

[image: Inline image 1]

This are my last two message with right count

3. I tried both way below, none of them create topic, but I will try it
again.

collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

//collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));

4. I wrote a topic call "http-demo" to Kafka as my input, and the content
can be show with command line below, so the Kafka should be OK.
deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic http-demo

Your help is highly appreciated.

Sincerely,
Selina




On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
snogh...@linkedin.com.invalid> wrote:

> Selina,
>
> You should probably check a few things
> 1. Your log files to see if you have any errors. Also, does you job fail or
> continues running?
> 2. Does this line "  logger.info("key="+key+": message="+message); " write
> any logs?
> 3. This might not be the only reason, but you are sending messages of
> type Map<String,
> String>. However, in your config file, you defined "
> systems.kafka.samza.msg.serde=string" which expects the message to be a
> String.
>
>
> Shadi
>
>
> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
> > Hi,  All
> >
> >      I am trying to write my first StreamTask class. I have a topic at
> > Kafka called "http-demo". I like to read the topic and write it to
> another
> > topic called "demo-duplicate"
> >
> >     Howeven there is not topic written to Kafka.
> >
> >     My properties file and StreamTask are below.  Can anyone told me what
> > is the bug?
> >     BTW, if I set checkpoint or Metrics at properties file. the topic of
> > checkpoint and metrics could be written to Kafka.  And the content of
> >  input topic -- http-demo could be show correctly.
> >
> > Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >
> > - - -- - - - - -
> > # Job
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=demo-parser
> >
> > # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >
> > # Task
> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
> > task.inputs=kafka.http-demo
> >
> > # Serializers
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> > # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.msg.serde=string
> > systems.kafka.samza.key.serde=string
> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > systems.kafka.consumer.auto.offset.reset=largest
> > systems.kafka.producer.bootstrap.servers=localhost:9092
> > - - -- - - - - -
> >
> > My StreamTask class is simple also
> >
> > ---------
> >
> > /**
> >  *
> >  * Read data from http-demo topic and write it back to "demo-duplicate"
> >  */
> > public class HttpDemoParserStreamTask implements StreamTask {
> >
> >     private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "demo-duplicate");
> >     Logger logger =
> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
> >
> >     @SuppressWarnings("unchecked")
> >     @Override
> >     public void process(IncomingMessageEnvelope envelope,
> MessageCollector
> > collector, TaskCoordinator coordinator) throws Exception {
> >
> >         String key = (String) envelope.getKey();
> >         String message = envelope.getMessage().toString();
> >         logger.info("key="+key+": message="+message);
> >
> >         Map<String, String> outgoingMap = (Map<String, String>)
> > (envelope.getMessage());
> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > outgoingMap));
> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > message));
> >     }
> >
> > }
> >
> > -------
> >
>

Reply via email to