Hi, Yan: Thanks for reply my email in detail. All the files at Yarn logs shown below. No Exception under samza-Demo/deploy/yarn/logs. I guess the StreamTask did not called ...
Partial stdout file (samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/stderr) is pasted below. In short, the log by logger.info("key="+key+": message="+message); " was not generated. /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -server -Dsamza.container.name=samza-application-master -Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/log4j.xml -Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001 -Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-cli-1.2 For file gc.log.0.current shown Allocation failure and Full GC CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456 -XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseGCLogFileRotation -XX:+UseParallelGC 2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)* 65536K->8449K(251392K), 0.0062314 secs] 2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc()) 39240K->6305K(251392K), 0.0047744 secs] 2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc()) 6305K->5940K(251392K), 0.0147206 secs] 2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure) 71476K->12511K(251392K), 0.0030179 secs] 2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure) 78047K->13859K(251392K), 0.0052610 secs] 2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold) 35659K->10106K(251392K), 0.0036350 secs] 2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC Threshold*) 10106K->7318K(149504K), 0.0200118 secs] [image: Inline image 1] Your help is highly appreciated. Sincerely, Selina On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang <yanfang...@gmail.com> wrote: > {quote} > I did not set auto.create.topics.enable anywhere > {quote} > > Fine. Then its default to true. No worries. > > {quote} > My job is listed as below. However I am wondering how can I know if my > method "public void* process*(IncomingMessageEnvelope envelope, > MessageCollector collector, TaskCoordinator coordinator)" was run or not. > {quote} > > If you have log enabled (from the code, you did), you can check the > contain's log to see if it has the output. Assuming you are using the local > yarn like what hello-samza provides, you should be able to check the logs > in deploy/yarn/userlogs/application_Id. > > If you use print.out method, you can see the result in the > deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask) > works. > > If it does not work, you can check the logs in > deploy/yarn/userlogs/application_Id as well to see the exceptions if there > is any. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > >> Hi, Yan and Shadi: >> >> I made a mistake. Actually, there is no log at /tmp/kafka-logs >> created by " logger.info("key="+key+": message="+message); ". The log >> I provided actually is log for input topic "http-demo" at >> /tmp/kafka-logs/http-demo-0 >> >> My job is listed as below. However I am wondering how can I know if >> my method "public void* process*(IncomingMessageEnvelope envelope, >> MessageCollector collector, TaskCoordinator coordinator)" was run or >> not. >> >> I manually create topic "demo-duplicate" by command line, otherwise >> it will be created by samza code. >> >> I checked I did not set auto.create.topics.enable anywhere. Attached >> is my properties file for Kafka >> >> >> Your help is highly appreciated >> >> Sincerely, >> Selina >> >> [image: Inline image 1] >> >> >> >> >> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <yanfang...@gmail.com> wrote: >> >>> The code and the property seem good to me. collector.send(new >>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am >>> curious if you accidentally disabled auto.create.topics.enable ...Can >>> you >>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets >>> anything. >>> >>> Let me know if it works. >>> >>> Thanks, >>> >>> Fang, Yan >>> yanfang...@gmail.com >>> >>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com> >>> wrote: >>> >>> > Hi, Shadi: >>> > >>> > Thans a lot for your reply. >>> > 1. There is no error log at Kafka and Samza >>> > >>> > 2. this line " logger.info("key="+key+": message="+message); " write >>> > log correctly as below: >>> > >>> > [image: Inline image 1] >>> > >>> > This are my last two message with right count >>> > >>> > 3. I tried both way below, none of them create topic, but I will try it >>> > again. >>> > >>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >>> outgoingMap)); >>> > >>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); >>> > >>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the >>> content >>> > can be show with command line below, so the Kafka should be OK. >>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 >>> > --from-beginning --topic http-demo >>> > >>> > Your help is highly appreciated. >>> > >>> > Sincerely, >>> > Selina >>> > >>> > >>> > >>> > >>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < >>> > snogh...@linkedin.com.invalid> wrote: >>> > >>> >> Selina, >>> >> >>> >> You should probably check a few things >>> >> 1. Your log files to see if you have any errors. Also, does you job >>> fail >>> >> or >>> >> continues running? >>> >> 2. Does this line " logger.info("key="+key+": message="+message); " >>> >> write >>> >> any logs? >>> >> 3. This might not be the only reason, but you are sending messages of >>> >> type Map<String, >>> >> String>. However, in your config file, you defined " >>> >> systems.kafka.samza.msg.serde=string" which expects the message to be >>> a >>> >> String. >>> >> >>> >> >>> >> Shadi >>> >> >>> >> >>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com >>> > >>> >> wrote: >>> >> >>> >> > Hi, All >>> >> > >>> >> > I am trying to write my first StreamTask class. I have a topic >>> at >>> >> > Kafka called "http-demo". I like to read the topic and write it to >>> >> another >>> >> > topic called "demo-duplicate" >>> >> > >>> >> > Howeven there is not topic written to Kafka. >>> >> > >>> >> > My properties file and StreamTask are below. Can anyone told me >>> >> what >>> >> > is the bug? >>> >> > BTW, if I set checkpoint or Metrics at properties file. the >>> topic of >>> >> > checkpoint and metrics could be written to Kafka. And the content >>> of >>> >> > input topic -- http-demo could be show correctly. >>> >> > >>> >> > Your help is highly appreciated. >>> >> > >>> >> > Sincerely, >>> >> > Selina >>> >> > >>> >> > >>> >> > - - -- - - - - - >>> >> > # Job >>> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >>> >> > job.name=demo-parser >>> >> >>> >> > >>> >> > # YARN >>> >> > >>> >> > >>> >> >>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz >>> >> > >>> >> > # Task >>> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask >>> >> > task.inputs=kafka.http-demo >>> >> > >>> >> > # Serializers >>> >> > >>> >> > >>> >> >>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >>> >> > >>> >> > # Kafka System >>> >> > >>> >> > >>> >> >>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >>> >> > systems.kafka.samza.msg.serde=string >>> >> > systems.kafka.samza.key.serde=string >>> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >>> >> > systems.kafka.consumer.auto.offset.reset=largest >>> >> > systems.kafka.producer.bootstrap.servers=localhost:9092 >>> >> > - - -- - - - - - >>> >> > >>> >> > My StreamTask class is simple also >>> >> > >>> >> > --------- >>> >> > >>> >> > /** >>> >> > * >>> >> > * Read data from http-demo topic and write it back to >>> "demo-duplicate" >>> >> > */ >>> >> > public class HttpDemoParserStreamTask implements StreamTask { >>> >> > >>> >> > private static final SystemStream OUTPUT_STREAM = new >>> >> > SystemStream("kafka", "demo-duplicate"); >>> >> > Logger logger = >>> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); >>> >> > >>> >> > @SuppressWarnings("unchecked") >>> >> > @Override >>> >> > public void process(IncomingMessageEnvelope envelope, >>> >> MessageCollector >>> >> > collector, TaskCoordinator coordinator) throws Exception { >>> >> > >>> >> > String key = (String) envelope.getKey(); >>> >> > String message = envelope.getMessage().toString(); >>> >> > logger.info("key="+key+": message="+message); >>> >> > >>> >> > Map<String, String> outgoingMap = (Map<String, String>) >>> >> > (envelope.getMessage()); >>> >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >>> >> > outgoingMap)); >>> >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >>> >> > message)); >>> >> > } >>> >> > >>> >> > } >>> >> > >>> >> > ------- >>> >> > >>> >> >>> > >>> > >>> >> >> >