Hi, Yan:

        Thanks for reply my email in detail.  All the files at Yarn logs
shown below. No Exception under samza-Demo/deploy/yarn/logs.  I guess the
StreamTask did not called ...


Partial stdout  file
(samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/stderr)
is pasted below. In short, the log by  logger.info("key="+key+":
message="+message); " was not generated.

/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java
-server -Dsamza.container.name=samza-application-master
-Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/log4j.xml
-Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001
-Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/tmp
-Xmx768M -XX:+PrintGCDateStamps
-Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10241024 -d64 -cp
/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-cli-1.2


For file gc.log.0.current shown Allocation failure and Full GC

CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456
-XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC
-XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
-XX:+UseCompressedClassPointers -XX:+UseCompressedOops
-XX:+UseGCLogFileRotation -XX:+UseParallelGC

2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)*
65536K->8449K(251392K), 0.0062314 secs]
2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc())
39240K->6305K(251392K), 0.0047744 secs]
2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc())
6305K->5940K(251392K), 0.0147206 secs]
2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure)
71476K->12511K(251392K), 0.0030179 secs]
2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure)
78047K->13859K(251392K), 0.0052610 secs]
2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold)
35659K->10106K(251392K), 0.0036350 secs]
2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC
Threshold*)  10106K->7318K(149504K), 0.0200118 secs]



[image: Inline image 1]

   Your help is highly appreciated.

Sincerely,
Selina

On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang <yanfang...@gmail.com> wrote:

> {quote}
>  I did not set auto.create.topics.enable anywhere
> {quote}
>
> Fine. Then its default to true. No worries.
>
> {quote}
> My job is listed as below. However I am wondering how can I know if my
> method "public void* process*(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator)" was run or not.
> {quote}
>
> If you have log enabled (from the code, you did), you can check the
> contain's log to see if it has the output. Assuming you are using the local
> yarn like what hello-samza provides, you should be able to check the logs
> in deploy/yarn/userlogs/application_Id.
>
> If you use print.out method, you can see the result in the
> deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
> works.
>
> If it does not work, you can check the logs in
> deploy/yarn/userlogs/application_Id as well to see the exceptions if there
> is any.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
>> Hi, Yan and Shadi:
>>
>>     I made a mistake.  Actually, there is no log at /tmp/kafka-logs
>> created by "  logger.info("key="+key+": message="+message); ".  The log
>> I provided actually is log for input topic "http-demo" at
>> /tmp/kafka-logs/http-demo-0
>>
>>     My job is listed as below. However I am wondering how can I know if
>> my method "public void* process*(IncomingMessageEnvelope envelope,
>> MessageCollector collector, TaskCoordinator coordinator)" was run or
>> not.
>>
>>     I manually create topic "demo-duplicate" by command line, otherwise
>> it will be created by samza code.
>>
>>     I checked I did not set auto.create.topics.enable anywhere. Attached
>> is my properties file for Kafka
>>
>>
>>    Your help is highly appreciated
>>
>> Sincerely,
>> Selina
>>
>> [image: Inline image 1]
>>
>>
>>
>>
>> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <yanfang...@gmail.com> wrote:
>>
>>> The code and the property seem good to me. collector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
>>> curious if you accidentally disabled auto.create.topics.enable  ...Can
>>> you
>>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
>>> anything.
>>>
>>> Let me know if it works.
>>>
>>> Thanks,
>>>
>>> Fang, Yan
>>> yanfang...@gmail.com
>>>
>>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com>
>>> wrote:
>>>
>>> > Hi, Shadi:
>>> >
>>> >       Thans a lot for your reply.
>>> > 1. There is no error log at Kafka and Samza
>>> >
>>> > 2.  this line "  logger.info("key="+key+": message="+message); " write
>>> > log correctly as below:
>>> >
>>> > [image: Inline image 1]
>>> >
>>> > This are my last two message with right count
>>> >
>>> > 3. I tried both way below, none of them create topic, but I will try it
>>> > again.
>>> >
>>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> outgoingMap));
>>> >
>>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>>> >
>>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the
>>> content
>>> > can be show with command line below, so the Kafka should be OK.
>>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
>>> > --from-beginning --topic http-demo
>>> >
>>> > Your help is highly appreciated.
>>> >
>>> > Sincerely,
>>> > Selina
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
>>> > snogh...@linkedin.com.invalid> wrote:
>>> >
>>> >> Selina,
>>> >>
>>> >> You should probably check a few things
>>> >> 1. Your log files to see if you have any errors. Also, does you job
>>> fail
>>> >> or
>>> >> continues running?
>>> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
>>> >> write
>>> >> any logs?
>>> >> 3. This might not be the only reason, but you are sending messages of
>>> >> type Map<String,
>>> >> String>. However, in your config file, you defined "
>>> >> systems.kafka.samza.msg.serde=string" which expects the message to be
>>> a
>>> >> String.
>>> >>
>>> >>
>>> >> Shadi
>>> >>
>>> >>
>>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com
>>> >
>>> >> wrote:
>>> >>
>>> >> > Hi,  All
>>> >> >
>>> >> >      I am trying to write my first StreamTask class. I have a topic
>>> at
>>> >> > Kafka called "http-demo". I like to read the topic and write it to
>>> >> another
>>> >> > topic called "demo-duplicate"
>>> >> >
>>> >> >     Howeven there is not topic written to Kafka.
>>> >> >
>>> >> >     My properties file and StreamTask are below.  Can anyone told me
>>> >> what
>>> >> > is the bug?
>>> >> >     BTW, if I set checkpoint or Metrics at properties file. the
>>> topic of
>>> >> > checkpoint and metrics could be written to Kafka.  And the content
>>> of
>>> >> >  input topic -- http-demo could be show correctly.
>>> >> >
>>> >> > Your help is highly appreciated.
>>> >> >
>>> >> > Sincerely,
>>> >> > Selina
>>> >> >
>>> >> >
>>> >> > - - -- - - - - -
>>> >> > # Job
>>> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>>> >> > job.name=demo-parser
>>> >>
>>> >> >
>>> >> > # YARN
>>> >> >
>>> >> >
>>> >>
>>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>>> >> >
>>> >> > # Task
>>> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>>> >> > task.inputs=kafka.http-demo
>>> >> >
>>> >> > # Serializers
>>> >> >
>>> >> >
>>> >>
>>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>>> >> >
>>> >> > # Kafka System
>>> >> >
>>> >> >
>>> >>
>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>> >> > systems.kafka.samza.msg.serde=string
>>> >> > systems.kafka.samza.key.serde=string
>>> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>> >> > systems.kafka.consumer.auto.offset.reset=largest
>>> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
>>> >> > - - -- - - - - -
>>> >> >
>>> >> > My StreamTask class is simple also
>>> >> >
>>> >> > ---------
>>> >> >
>>> >> > /**
>>> >> >  *
>>> >> >  * Read data from http-demo topic and write it back to
>>> "demo-duplicate"
>>> >> >  */
>>> >> > public class HttpDemoParserStreamTask implements StreamTask {
>>> >> >
>>> >> >     private static final SystemStream OUTPUT_STREAM = new
>>> >> > SystemStream("kafka", "demo-duplicate");
>>> >> >     Logger logger =
>>> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>>> >> >
>>> >> >     @SuppressWarnings("unchecked")
>>> >> >     @Override
>>> >> >     public void process(IncomingMessageEnvelope envelope,
>>> >> MessageCollector
>>> >> > collector, TaskCoordinator coordinator) throws Exception {
>>> >> >
>>> >> >         String key = (String) envelope.getKey();
>>> >> >         String message = envelope.getMessage().toString();
>>> >> >         logger.info("key="+key+": message="+message);
>>> >> >
>>> >> >         Map<String, String> outgoingMap = (Map<String, String>)
>>> >> > (envelope.getMessage());
>>> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > outgoingMap));
>>> >> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > message));
>>> >> >     }
>>> >> >
>>> >> > }
>>> >> >
>>> >> > -------
>>> >> >
>>> >>
>>> >
>>> >
>>>
>>
>>
>

Reply via email to