How do y'all feel about mixing languages? My sentence producer (as you've seen) is written in Python. It wouldn't take long to convert it, but it seems like a lot of extra effort for little gain.
If it's not a big deal, where in the hello-samza project should it live? On Tue, Mar 24, 2015 at 10:45 AM, Chinmay Soman <chinmay.cere...@gmail.com> wrote: > Hey Ash, > > Yeah I think more examples would be great ! Feel free to open a RB. > > About the config: It might be a bit of an overhead in general and requires > some getting used to. I realize that after I've started writing Samza jobs > myself. I'm kinda new to Samza myself, but I always kept making some silly > mistakes which is difficult to validate early on. Its one of those things > which is very difficult to get right (cause everything has its pros and > cons). Hopefully I can finish up my patch regarding exactly that problem - > config validation. > > On Mon, Mar 23, 2015 at 10:04 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > > > just to clarify, adding: > > > > serializers.registry.string.class=org.apache.samza. > > serializers.StringSerdeFactory > > systems.kafka.streams.myTopic.samza.msg.serde=string > > > > to the property file and updating the java source to: > > System.out.println((String)envelope.getMessage()); > > > > Did the trick. I've updated the pastebin with the appropriate change. > > > > Now, through this whole process I've assumed that StreamTask is the > > appropriate class to derive from. Essentially, the end goal is to > decode a > > compressed bytestream Kafka event (from a different topic, of course) and > > then feed it to a DB/TSDB/whatever. I didn't think that I'd need to > > generate a Job for this and that this stream should be able to feed > > directly to the output entity. > > > > Anyway, over the next couple of days I'll migrate this into something > that > > can live in the hello-samza ecosystem as a separate task. I've got the > > start of a java based DataPusher built as well. > > > > On Mon, Mar 23, 2015 at 9:40 PM, Ash W Matheson <ash.mathe...@gmail.com> > > wrote: > > > > > Huzzah! I ... have ... text showing! > > > > > > This has been enough of a trial that I think I'll convert this into a > > very > > > simple sample project for the repo, if you guys are interested. > > > > > > Diff coming once I have it cleaned up into something less ugly. > > > > > > -Ash > > > > > > On Mon, Mar 23, 2015 at 9:27 PM, Chinmay Soman < > > chinmay.cere...@gmail.com> > > > wrote: > > > > > >> >I changed the systems.kafka.samza.msg.serde=json to 'string' a while > > >> back, > > >> but that caused a separate exception. However that was many, MANY > > >> attempts > > >> ago. > > >> > > >> This may not work because that will set all serialization formats > (input > > >> and output) to json / string. In your case you're inputting string and > > >> outputting json. So you might have to set that explicitly. > > >> > > >> On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman < > > chinmay.cere...@gmail.com > > >> > > > >> wrote: > > >> > > >> > Since you're producing String data to 'myTopic', can you try setting > > the > > >> > string serialization in your config ? > > >> > > > >> > > > >> > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > >> > > > >> > systems.kafka.streams.myTopic.samza.msg.serde=string > > >> > > > >> > > > >> > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson < > > ash.mathe...@gmail.com > > >> > > > >> > wrote: > > >> > > > >> >> more info - new exception message: > > >> >> > > >> >> Exception in thread "main" > > >> >> org.apache.samza.system.SystemConsumersException: Cannot > deserialize > > an > > >> >> incoming message. > > >> >> > > >> >> Updated the diff in pastebin with the changes. > > >> >> > > >> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson < > > >> ash.mathe...@gmail.com> > > >> >> wrote: > > >> >> > > >> >> > Gah! Yeah, those were gone several revisions ago but didn't get > > >> nuked > > >> >> in > > >> >> > the last iteration. > > >> >> > > > >> >> > OK, let me do a quick test to see if that was my problem all > along. > > >> >> > > > >> >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh < > > >> >> > nram...@linkedin.com.invalid> wrote: > > >> >> > > > >> >> >> Hey Ash, > > >> >> >> I was referring to the lines before the try block. > > >> >> >> > > >> >> >> Map<String, Object> jsonObject = (Map<String, Object>) > > >> >> >> envelope.getMessage(); > > >> >> >> WikipediaFeedEvent event = new > WikipediaFeedEvent(jsonObject); > > >> >> >> > > >> >> >> try { > > >> >> >> System.out.println("[DWH] should see this"); > > >> >> >> System.out.println(event.getRawEvent()); > > >> >> >> … > > >> >> >> > > >> >> >> > > >> >> >> Did you remove those lines as well? > > >> >> >> > > >> >> >> Navina > > >> >> >> > > >> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> > > >> wrote: > > >> >> >> > > >> >> >> >Just looking at the diff I posted and it's: > > >> >> >> > > > >> >> >> > > > >> >> >> > 1. try { > > >> >> >> > 2. - Map<String, Object> parsedJsonObject = > > >> >> >> >parse(event.getRawEvent( > > >> >> >> > )); > > >> >> >> > 3. + System.out.println("[DWH] should see this"); > > >> >> >> > 4. + System.out.println(event.getRawEvent()); > > >> >> >> > 5. + // Map<String, Object> parsedJsonObject = parse( > > >> >> >> > event.getRawEvent()); > > >> >> >> > > > >> >> >> > > > >> >> >> >I've removed the Map and added two System.out.println calls. > So > > >> no, > > >> >> >> there > > >> >> >> >shouldn't be any reference to > > >> >> >> >Map<String, Object> parsedJsonObject = > > parse(event.getRawEvent()); > > >> >> >> >in the source java file. > > >> >> >> > > > >> >> >> > > > >> >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson < > > >> >> ash.mathe...@gmail.com> > > >> >> >> >wrote: > > >> >> >> > > > >> >> >> >> I'm in transit right now but if memory serves me everything > > >> should > > >> >> be > > >> >> >> >> commented out of that method except for the > System.out.println > > >> call. > > >> >> >> >>I'll > > >> >> >> >> be home shortly and can confirm. > > >> >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" > > >> >> <nram...@linkedin.com.invalid > > >> >> >> > > > >> >> >> >> wrote: > > >> >> >> >> > > >> >> >> >>> Hi Ash, > > >> >> >> >>> I just ran wikipedia-parser with your patch. Looks like you > > have > > >> >> set > > >> >> >> >>>the > > >> >> >> >>> message serde correctly in the configs. However, the > original > > >> code > > >> >> >> >>>still > > >> >> >> >>> converts it into a Map for consumption in the > > >> WikipediaFeedEvent. > > >> >> >> >>> I am seeing the following (expected): > > >> >> >> >>> > > >> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] > > >> Uncaught > > >> >> >> >>> exception in thread (name=main). Exiting process now. > > >> >> >> >>> java.lang.ClassCastException: java.lang.String cannot be > cast > > to > > >> >> >> >>> java.util.Map > > >> >> >> >>> at > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi > > >> >> >> >>>aPa > > >> >> >> >>> rserStreamTask.java:38) > > >> >> >> >>> at > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp( > > >> >> >> >>>Tas > > >> >> >> >>> kInstance.scala:133) > > >> >> >> >>> > > >> >> >> >>> Did you make the changes to fix this error? Your patch > doesn¹t > > >> >> seem to > > >> >> >> >>> have that. > > >> >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, > Object>) > > >> >> >> >>> envelope.getMessage(); > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> Lmk so I can investigate further. > > >> >> >> >>> > > >> >> >> >>> Cheers! > > >> >> >> >>> Navina > > >> >> >> >>> > > >> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" < > ash.mathe...@gmail.com > > > > > >> >> wrote: > > >> >> >> >>> > > >> >> >> >>> >If anyone's interested, I've posted a diff of the project > > here: > > >> >> >> >>> >http://pastebin.com/6ZW6Y1Vu > > >> >> >> >>> >and the python publisher here: > http://pastebin.com/2NvTFDFx > > >> >> >> >>> > > > >> >> >> >>> >if you want to take a stab at it. > > >> >> >> >>> > > > >> >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson > > >> >> >> >>><ash.mathe...@gmail.com> > > >> >> >> >>> >wrote: > > >> >> >> >>> > > > >> >> >> >>> >> Ok, so very simple test, all running on a local machine, > > not > > >> >> across > > >> >> >> >>> >> networks and all in the hello-samza repo this time > around. > > >> >> >> >>> >> > > >> >> >> >>> >> I've got the datapusher.py file set up to push data into > > >> >> localhost. > > >> >> >> >>>One > > >> >> >> >>> >> event per second. > > >> >> >> >>> >> And a modified hello-samza where I've modified the > > >> >> >> >>> >> WikipediaParserStreamTask.java class to simply read > what's > > >> >> there. > > >> >> >> >>> >> > > >> >> >> >>> >> Running them both now and I'm seeing in the stderr files > > >> >> >> >>> >> > > >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) > > >> >> >> >>>the > > >> >> >> >>> >> following: > > >> >> >> >>> >> > > >> >> >> >>> >> Exception in thread "main" > > >> >> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot > > >> >> >> >>>deserialize an > > >> >> >> >>> >> incoming message. > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 > > >> >> >> >>>>>93) > > >> >> >> >>> >> at org.apache.samza.system.SystemConsumers.org > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste > > >> >> >> >>>>>mCo > > >> >> >> >>> >>nsumers.scala:276) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste > > >> >> >> >>>>>mCo > > >> >> >> >>> >>nsumers.scala:276) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > > >> >> >> >>>>>sca > > >> >> >> >>> >>la:244) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > > >> >> >> >>>>>sca > > >> >> >> >>> >>la:244) > > >> >> >> >>> >> at > > >> >> scala.collection.Iterator$class.foreach(Iterator.scala:727) > > >> >> >> >>> >> at > > >> >> >> > >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > >> >> >> >>> >> at > > >> >> >> >>> > > >> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s > > >> >> >> >>>>>cal > > >> >> >> >>> >>a:47) > > >> >> >> >>> >> at > scala.collection.SetLike$class.map(SetLike.scala:93) > > >> >> >> >>> >> at scala.collection.AbstractSet.map(Set.scala:47) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala: > > >> >> >> >>>>>276 > > >> >> >> >>> >>) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2 > > >> >> >> >>>>>13) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply > > >> >> >> >>>>>(Ru > > >> >> >> >>> >>nLoop.scala:81) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply > > >> >> >> >>>>>(Ru > > >> >> >> >>> >>nLoop.scala:81) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > > > >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > > >> >> >> >>> >> at > > >> >> >> > > >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc > > >> >> >> >>>>>ala > > >> >> >> >>> >>:80) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > > > >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > > >> >> >> >>> >> at > > >> >> >> > > >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > > >> >> >> >>> >> at > > >> >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79) > > >> >> >> >>> >> at > > >> org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > > > >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca > > >> >> >> >>>>>la: > > >> >> >> >>> >>108) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > > >> >> >> >>> >> at > > >> >> >> >>> > > >> >> > > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > >> >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException: > > >> Unexpected > > >> >> >> >>> character > > >> >> >> >>> >> ('M' (code 77)): expected a valid value (number, String, > > >> array, > > >> >> >> >>>object, > > >> >> >> >>> >> 'true', 'false' or 'null') > > >> >> >> >>> >> at [Source: [B@5454d285; line: 1, column: 2] > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse > > >> >> >> >>>>>rMi > > >> >> >> >>> >>nimalBase.java:385) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar( > > >> >> >> >>>>>Jso > > >> >> >> >>> >>nParserMinimalBase.java:306) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8 > > >> >> >> >>>>>Str > > >> >> >> >>> >>eamParser.java:1581) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S > > >> >> >> >>>>>tre > > >> >> >> >>> >>amParser.java:436) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser. > > >> >> >> >>>>>jav > > >> >> >> >>> >>a:322) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav > > >> >> >> >>>>>a:2 > > >> >> >> >>> >>432) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja > > >> >> >> >>>>>va: > > >> >> >> >>> >>2389) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> > > >> >> > > >> > > > >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > > >> >> >> >>> >> at > > >> >> >> >>> > > >> >> > > >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala > > >> >> >> >>>>>:11 > > >> >> >> >>> >>5) > > >> >> >> >>> >> at > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 > > >> >> >> >>>>>90) > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to > > 'string' > > >> a > > >> >> >> while > > >> >> >> >>> >>back, > > >> >> >> >>> >> but that caused a separate exception. However that was > > many, > > >> >> MANY > > >> >> >> >>> >>attempts > > >> >> >> >>> >> ago. > > >> >> >> >>> >> > > >> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < > > >> >> >> >>> ash.mathe...@gmail.com> > > >> >> >> >>> >> wrote: > > >> >> >> >>> >> > > >> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh script. > > >> >> >> >>> >>> > > >> >> >> >>> >>> Yeah, it's already there by default: > > >> >> >> >>> >>> > > >> >> >> >>> >>> > > >> >> >> >>> >>> # Metrics > > >> >> >> >>> >>> metrics.reporters=snapshot,jmx > > >> >> >> >>> >>> > > >> >> >> >>> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met > > >> >> >> >>>>>>ric > > >> >> >> >>> >>>sSnapshotReporterFactory > > >> >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics > > >> >> >> >>> >>> > > >> >> >> >>> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > >> > > > >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor > > >> >> >> >>>>>>ter > > >> >> >> >>> >>>Factory > > >> >> >> >>> >>> > > >> >> >> >>> >>> So, where would I see those metrics? > > >> >> >> >>> >>> > > >> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson > > >> >> >> >>> >>><ash.mathe...@gmail.com> > > >> >> >> >>> >>> wrote: > > >> >> >> >>> >>> > > >> >> >> >>> >>>> read: I'm a C++ programmer looking at Java for the > first > > >> time > > >> >> in > > >> >> >> >>>> 10 > > >> >> >> >>> >>>> years > > >> >> >> >>> >>>> > > >> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson > > >> >> >> >>> >>>><ash.mathe...@gmail.com> > > >> >> >> >>> >>>> wrote: > > >> >> >> >>> >>>> > > >> >> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that > get > > >> set? > > >> >> >> >>> >>>>> > > >> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < > > >> >> >> >>> >>>>> chinmay.cere...@gmail.com> wrote: > > >> >> >> >>> >>>>> > > >> >> >> >>> >>>>>> Hey Ash, > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx > > metrics > > >> >> >> >>>defined) > > >> >> >> >>> >>>>>>to > > >> >> >> >>> >>>>>> see > > >> >> >> >>> >>>>>> if your job is actually doing anything ? My only > guess > > at > > >> >> this > > >> >> >> >>> point > > >> >> >> >>> >>>>>> is the > > >> >> >> >>> >>>>>> process method is not being called because somehow > > >> there's > > >> >> no > > >> >> >> >>> >>>>>>incoming > > >> >> >> >>> >>>>>> data. I could be totally wrong of course. > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < > > >> >> >> >>> >>>>>> ash.mathe...@gmail.com> > > >> >> >> >>> >>>>>> wrote: > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > Just to be clear, here's what's changed from the > > >> default > > >> >> >> >>> >>>>>>hello-samza > > >> >> >> >>> >>>>>> repo: > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > > wikipedia-parser.properties========================== > > >> >> >> >>> >>>>>> > task.inputs=kafka.myTopic > > >> >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect= > > >> >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ > > >> >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > WikipediaParserStreamTask.java > ===================== > > >> >> >> >>> >>>>>> > public void process(IncomingMessageEnvelope > > envelope, > > >> >> >> >>> >>>>>> MessageCollector > > >> >> >> >>> >>>>>> > collector, TaskCoordinator coordinator) { > > >> >> >> >>> >>>>>> > Map<String, Object> jsonObject = (Map<String, > > >> Object>) > > >> >> >> >>> >>>>>> > envelope.getMessage(); > > >> >> >> >>> >>>>>> > WikipediaFeedEvent event = new > > >> >> >> >>> WikipediaFeedEvent(jsonObject); > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > try { > > >> >> >> >>> >>>>>> > System.out.println(event.getRawEvent()); > > >> >> >> >>> >>>>>> > // Map<String, Object> parsedJsonObject = > > >> >> >> >>> >>>>>> parse(event.getRawEvent()); > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > // parsedJsonObject.put("channel", > > >> >> event.getChannel()); > > >> >> >> >>> >>>>>> > // parsedJsonObject.put("source", > > >> >> event.getSource()); > > >> >> >> >>> >>>>>> > // parsedJsonObject.put("time", > > event.getTime()); > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > // collector.send(new > > OutgoingMessageEnvelope(new > > >> >> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), > > >> >> parsedJsonObject)); > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > as well as the aforementioned changes to the > > log4j.xml > > >> >> file. > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing > > >> more > > >> >> than > > >> >> >> >>>a > > >> >> >> >>> >>>>>> sentence. > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < > > >> >> >> >>> >>>>>> ash.mathe...@gmail.com> > > >> >> >> >>> >>>>>> > wrote: > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > > yep, modified log4j.xml to look like this: > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > > <root> > > >> >> >> >>> >>>>>> > > <priority value="debug" /> > > >> >> >> >>> >>>>>> > > <appender-ref ref="RollingAppender"/> > > >> >> >> >>> >>>>>> > > <appender-ref ref="jmx" /> > > >> >> >> >>> >>>>>> > > </root> > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > > Not sure what you mean by #2. > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > > However, I'm running now, not seeing any > > exceptions, > > >> but > > >> >> >> >>>still > > >> >> >> >>> >>>>>>not > > >> >> >> >>> >>>>>> seeing > > >> >> >> >>> >>>>>> > > any output from System.out.println(...) > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen > > >> Somasundaram < > > >> >> >> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > >> Hey Ash, > > >> >> >> >>> >>>>>> > >> 1. Did you happen to modify your > > >> >> log4j.xml > > >> >> >> ? > > >> >> >> >>> >>>>>> > >> 2. Can you print the class path > > that > > >> was > > >> >> >> >>> printed > > >> >> >> >>> >>>>>> when the > > >> >> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not > > >> loaded or > > >> >> >> not > > >> >> >> >>> >>>>>> present in > > >> >> >> >>> >>>>>> > the > > >> >> >> >>> >>>>>> > >> path where it¹s looking for. If you have been > > using > > >> >> hello > > >> >> >> >>> >>>>>>samza, > > >> >> >> >>> >>>>>> it > > >> >> >> >>> >>>>>> > should > > >> >> >> >>> >>>>>> > >> have pulled it from Maven. > > >> >> >> >>> >>>>>> > >> > > >> >> >> >>> >>>>>> > >> Thanks, > > >> >> >> >>> >>>>>> > >> Naveen > > >> >> >> >>> >>>>>> > >> > > >> >> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < > > >> >> >> >>> >>>>>> ash.mathe...@gmail.com> > > >> >> >> >>> >>>>>> > >> wrote: > > >> >> >> >>> >>>>>> > >> > > >> >> >> >>> >>>>>> > >> > Hey all, > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into > > >> some > > >> >> odd > > >> >> >> >>> >>>>>>issues. > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' > repo > > >> and > > >> >> >> >>>trying > > >> >> >> >>> >>>>>>to > > >> >> >> >>> >>>>>> parse a > > >> >> >> >>> >>>>>> > >> > simple kafka topic that I've produced through > an > > >> >> extenal > > >> >> >> >>> java > > >> >> >> >>> >>>>>> app > > >> >> >> >>> >>>>>> > >> (nothing > > >> >> >> >>> >>>>>> > >> > other than a series of sentences) and it's > > failing > > >> >> >> pretty > > >> >> >> >>> >>>>>>hard > > >> >> >> >>> >>>>>> for me. > > >> >> >> >>> >>>>>> > >> The > > >> >> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but > > as > > >> >> soon > > >> >> >> >>>as I > > >> >> >> >>> >>>>>> change the > > >> >> >> >>> >>>>>> > >> > configuration to look at a different > > >> Kafka/zookeeper > > >> >> I > > >> >> >> >>>get > > >> >> >> >>> >>>>>>the > > >> >> >> >>> >>>>>> > >> following in > > >> >> >> >>> >>>>>> > >> > the userlogs: > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] > > >> Unable to > > >> >> >> >>>fetch > > >> >> >> >>> >>>>>>last > > >> >> >> >>> >>>>>> > offsets > > >> >> >> >>> >>>>>> > >> > for streams [myTopic] due to > > >> >> >> kafka.common.KafkaException: > > >> >> >> >>> >>>>>> fetching > > >> >> >> >>> >>>>>> > topic > > >> >> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker > > >> >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] > > >> failed. > > >> >> >> >>> Retrying. > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > The modifications are pretty straightforward. > > In > > >> the > > >> >> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the > > >> >> following: > > >> >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic > > >> >> >> >>> >>>>>> > >> > > > >> >> systems.kafka.consumer.zookeeper.connect=redacted:2181/ > > >> >> >> >>> >>>>>> > >> > > > systems.kafka.consumer.auto.offset.reset=smallest > > >> >> >> >>> >>>>>> > >> > > > >> >> >> systems.kafka.producer.metadata.broker.list=redacted:9092 > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > and in the actual java file > > >> >> >> >>>WikipediaParserStreamTask.java > > >> >> >> >>> >>>>>> > >> > public void process(IncomingMessageEnvelope > > >> >> envelope, > > >> >> >> >>> >>>>>> > MessageCollector > > >> >> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) { > > >> >> >> >>> >>>>>> > >> > Map<String, Object> jsonObject = > (Map<String, > > >> >> >> Object>) > > >> >> >> >>> >>>>>> > >> > envelope.getMessage(); > > >> >> >> >>> >>>>>> > >> > WikipediaFeedEvent event = new > > >> >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject); > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > try { > > >> >> >> >>> >>>>>> > >> > > System.out.println(event.getRawEvent()); > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > And then following the compile/extract/run > > process > > >> >> >> >>>outlined > > >> >> >> >>> >>>>>>in > > >> >> >> >>> >>>>>> the > > >> >> >> >>> >>>>>> > >> > hello-samza website. > > >> >> >> >>> >>>>>> > >> > > > >> >> >> >>> >>>>>> > >> > Any thoughts? I've looked online for any > 'super > > >> >> simple' > > >> >> >> >>> >>>>>> examples of > > >> >> >> >>> >>>>>> > >> > ingesting kafka in samza with very little > > success. > > >> >> >> >>> >>>>>> > >> > > >> >> >> >>> >>>>>> > >> > > >> >> >> >>> >>>>>> > > > > >> >> >> >>> >>>>>> > > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> -- > > >> >> >> >>> >>>>>> Thanks and regards > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> Chinmay Soman > > >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>> > > >> >> >> >>> >>>>> > > >> >> >> >>> >>>> > > >> >> >> >>> >>> > > >> >> >> >>> >> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> > > >> >> >> > > >> >> > > > >> >> > > >> > > > >> > > > >> > > > >> > -- > > >> > Thanks and regards > > >> > > > >> > Chinmay Soman > > >> > > > >> > > >> > > >> > > >> -- > > >> Thanks and regards > > >> > > >> Chinmay Soman > > >> > > > > > > > > > > > > -- > Thanks and regards > > Chinmay Soman >