Hi, Yan:
Thanks for fixing the bug for me.
Sincerely,
Selina
On Tue, Jul 28, 2015 at 12:03 PM, Yan Fang <[email protected]> wrote:
> task.class=samza.http.demo.task.HttpDemoParserStreamTask ...
>
> you are not using the StateStream class...
>
> Fang, Yan
> [email protected]
>
> On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <[email protected]>
> wrote:
>
> > Hi, Yan
> >
> > I like to correct my previous comment, when I comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> > *the logger is not show at *at samza-container-0.log, but it make sense.
> >
> >
> > Sincerely,
> > Seina
> >
> > On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <[email protected]>
> > wrote:
> >
> > > Hi, Yan:
> > >
> > > Thanks a lot for your reply.
> > > I tried to comment out
> > systems.kafka.http-demo.samza.offset.default=oldest
> > > and then I tried to comment out
> > > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > > systems.kafka.streams.http-demo.samza.reset.offset=true
> > >
> > > The result is same as before. 1. the checkoutpoint topic was created,
> > 2.
> > > the log created by Logger can be found at /samza-container-0.log. 3. no
> > > exception is at samza-container-0.log.
> > >
> > > I guess something conflict between HttpDemoParserStreamTask and
> > > HttpDemoStatsStreamTask? Is any resource registered by
> > > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> > recreate
> > > a topic?
> > >
> > > Sincerely,
> > > Selina
> > >
> > > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <[email protected]>
> wrote:
> > >
> > >> Can you comment out
> > "systems.kafka.http-demo.samza.offset.default=oldest"
> > >> to see how it works? This seems not a correct property.
> > >>
> > >> Thanks,
> > >>
> > >> Fang, Yan
> > >> [email protected]
> > >>
> > >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <[email protected]
> >
> > >> wrote:
> > >>
> > >> > Hi, Dear All:
> > >> >
> > >> > I have two Tasks at Samza. HttpDemoParserStreamTask and
> > >> > HttpDemoStatsStreamTask. They are almost same, except the output
> topic
> > >> name
> > >> > is different and the task name are different at properties file. I
> am
> > >> > wondering how should I debug on it?
> > >> >
> > >> > More details are list below.
> > >> >
> > >> > All your help is highly appreciated.
> > >> >
> > >> > Sincerely,
> > >> > Selina
> > >> >
> > >> > Currently HttpDemoParserStreamTask run well.
> > >> > However HttpDemoStatsStreamTask can generate the log correctly
> > >> withouot
> > >> > Exception at
> > >> >
> > >> >
> > >>
> >
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> > >> > samza-container-0.log
> > >> >
> > >> > The last record as below is right, however there is no topic "
> > >> > demo-stats-temp" was created.
> > >> > --------------------------------------
> > >> >
> > >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> > >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> > >> >
> > >> >
> > >>
> >
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> > >> >
> > >>
> >
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> > >> >
> > >>
> >
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> > >> > Galaxy S6","operationSystem":"Android
> > >> >
> > >> >
> > >>
> >
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> > >> >
> > >> >
> > >> > -------------------The demo-stats.properties
> > >> > files-----------------------------
> > >> >
> > >> > # Job
> > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > >> > job.name=demo-stats-tmp
> > >>
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > >> > task.checkpoint.system=kafka
> > >> > # Normally, this would be 3, but we have only one broker.
> > >> > task.checkpoint.replication.factor=1
> > >> >
> > >> > # YARN
> > >> >
> > >> >
> > >>
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> > >> >
> > >> > # Task
> > >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
> > >> > task.inputs=kafka.http-demo
> > >> >
> > >> > # Serializers
> > >> >
> > >> >
> > >>
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> > >> >
> > >> > # Kafka System
> > >> >
> > >> >
> > >>
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > >> > systems.kafka.samza.msg.serde=string
> > >> >
> > >> > systems.kafka.samza.key.serde=string
> > >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > >> > systems.kafka.producer.bootstrap.servers=localhost:9092
> > >> >
> > >> > #stream from begining
> > >> > #systems.kafka.consumer.auto.offset.reset=smallest
> > >> > #http-demo from the oldest
> > >> > systems.kafka.http-demo.samza.offset.default=oldest
> > >> > # all stream from the oldest
> > >> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > >> > systems.kafka.streams.http-demo.samza.reset.offset=true
> > >> >
> > >> >
> > >> >
> > >> > --------------------HttpDemoStatsStreamTask
> > >> > class----------------------------
> > >> >
> > >> > public class HttpDemoStatsStreamTask implements StreamTask {
> > >> >
> > >> > //output topic
> > >> > private static final SystemStream OUTPUT_STREAM = new
> > >> > SystemStream("kafka", "demo-stats-temp");
> > >> > Logger logger =
> > >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> > >> >
> > >> > @SuppressWarnings("unchecked")
> > >> > @Override
> > >> > public void process(IncomingMessageEnvelope envelope,
> > >> > MessageCollector collector, TaskCoordinator coordinator) throws
> > >> > Exception {
> > >> >
> > >> >
> > >> > String key = (String) envelope.getKey();
> > >> > String message = envelope.getMessage().toString();
> > >> > logger.info("key=" + key + ": message=" + message);
> > >>
> > >> >
> > >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > >> > message));
> > >> > }
> > >> > }
> > >> >
> > >> > -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> > >> > topic--------------
> > >> >
> > >> > {"Partition 0":0}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >>
> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> > {"SystemStreamPartition [kafka, http-demo,
> > >> >
> > >> >
> > >>
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > >> >
> > >>
> > >
> > >
> >
>