You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using
SparkFlumeEvent.event. That should probably give you all the original text
data.



On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vikram.kulka...@hp.com>wrote:

>  Hi Spark-users,
>
>   Within my Spark Streaming program, I am able to ingest data sent by my
> Flume Avro Client. I configured a ‘spooling directory source’ to write data
> to a Flume Avro Sink (the Spark Streaming Driver program in this case). The
> default deserializer i.e. LINE is used to parse the file into events.
> Therefore I am expecting an event (SparkFlumeEvent) for every line in the
> log file.
>
>
>
> My Spark Streaming Code snippet here:
>
>
>
>        System.*out*.println("Setting up Flume Stream using Avro Sink at: "+ 
> avroServer +
> ":" + avroPort);
>
>
>
>        //JavaDStream<SparkFlumeEvent> flumeStream =
> sc.flumeStream("XXX.YYY.XXX.YYY", port);
>
>        JavaDStream<SparkFlumeEvent> flumeStream =
>
>                      FlumeUtils.*createStream*(ssc, avroServer, avroPort);
>
>
>
>        flumeStream.count();
>
>
>
>        flumeStream.foreach(*new* *Function<JavaRDD<SparkFlumeEvent>,Void>
> ()* {
>
>                      @Override
>
>                      *public* Void call(JavaRDD<SparkFlumeEvent>
> eventsData) *throws* Exception {
>
>                             List<SparkFlumeEvent> events =
> eventsData.collect();
>
>                             Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>
>
>                             System.*out*.println(">>>>>> Received Spark
> Flume Events: " + events.size());
>
>                             *while*(batchedEvents.hasNext()) {
>
>                                    SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
>                                    //System.out.println("SparkFlumeEvent
> = " + flumeEvent);
>
>                                    //System.out.println(">>>>>>>>" +
> flumeEvent.toString());
>
>
>
> //TODO: How to build each line in the file using this SparkFlumeEvent
> object?
>
>                             }
>
>                            *return* *null*;
>
>                      }
>
>              });
>
>
>
> Within this while loop, how do I extract each line that was streamed using
> the SparkFlumeEvent object? I intend to then parse this line, extract
> various fields and then persist it to memory.
>
>
>
> Regards,
>
> Vikram
>
>
>

Reply via email to