Stepping a bit back, if you just want to write flume data to HDFS, you can
use flume's HDFS sink for that.

Trying to do this using Spark Streaming and SparkFlumeEvent is
unnecessarily complex. And I guess it is tricky to write the raw bytes from
the sparkflumevent into a file. If you want to do it this way, I suggest
trying this (not tested, pure guess work).

RDD[SparkFlumeEvent] ---> map to get the RDD of payload bytes ---> do
RDD.mapPartition() to write the whole RDD's partition of bytes into a HDFS
file (using HDFS's file output stream interface)

You will have to take care of making the file names of each parititon
unique, and dealing with failures in writing, etc.

TD



On Mon, Jul 14, 2014 at 9:29 AM, Sundaram, Muthu X. <
muthu.x.sundaram....@sabre.com> wrote:

> I am not sure how to write it…I tried writing to local file system using
> FileWriter and Print Writer. I tried it inside the while loop. I am able to
> get the text and able to print it but it fails when I use regular java
> classes. Shouldn’t I use regular java classes here? Can I write to only
> HDFS? Should I have to create the file in HDFS using HDFS classes? I
> thought of using Spark’s SaveAsTextFile(). But I have JavaRDD<SparkFlumeEvent>
> of this..not JavaRDD<AvroEvent>. So I am not sure whether SaveAsText() will
> work. I appreciate any guidance here. How do I get more code examples?
> Books, URL?
>
>
>
>
>
>   flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>
>               @Override
>
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>
>                      String logRecord = null;
>
>                      List<SparkFlumeEvent> events = eventsData.collect();
>
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>                      long t1 = System.currentTimeMillis();
>
>                      AvroFlumeEvent avroEvent = null;
>
>                      ByteBuffer bytePayload =
> null;
>
>                      // All the user level data is carried as payload in
> Flume Event
>
>                      while(batchedEvents.hasNext()) {
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
>                             avroEvent = flumeEvent.event();
>
>                             bytePayload = avroEvent.getBody();
>
>                             logRecord = new
> String(bytePayload.array());
>
>
>
>                             System.out.println(">>>>>>>>LOG RECORD = " +
> logRecord);
>
>
>
>                            ??I was trying to write the data to hdfs..but
> it fails…
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *Sent:* Friday, July 11, 2014 1:43 PM
> *To:* user@spark.apache.org
> *Cc:* u...@spark.incubator.apache.org
> *Subject:* Re: writing FLume data to HDFS
>
>
>
> What is the error you are getting when you say "??I was trying to write
> the data to hdfs..but it fails…"
>
>
>
> TD
>
>
>
> On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. <
> muthu.x.sundaram....@sabre.com> wrote:
>
> I am new to spark. I am trying to do the following.
>
> NetcatàFlumeàSpark streaming(process Flume Data)àHDFS.
>
>
>
> My flume config file has following set up.
>
>
>
> Source = netcat
>
> Sink=avrosink.
>
>
>
> Spark Streaming code:
>
> I am able to print data from flume to the monitor. But I am struggling to
> create a file. In order to get the real data I need to convert SparkEvent
> to avroEvent.
>
> JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of
> SparkEvent..Do I need to convert this in to collection of
> JavaRDD<AvroEvent>?
>
> Please share any code examples… Thanks.
>
>
>
> Code:
>
>
>
>  Duration batchInterval = new Duration(2000);
>
>     SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlumeEventCount");
>
>     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
>     JavaDStream<SparkFlumeEvent> flumeStream =
> FlumeUtils.createStream(ssc, host, port);
>
>
>
>     flumeStream.count();
>
> flumeStream.foreachRDD(new
> Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
>
>  @Override
>
>  public Void call(JavaRDD<SparkFlumeEvent>
> events1,JavaRDD<SparkFlumeEvent> events2) throws Exception{
>
> events1.saveasTextFile("output.txt");
>
> return null;
>
>  }
>
>  });
>
>
>
>     /*flumeStream.count().map(new Function<Long, String>() {
>
>       @Override
>
>       public String call(Long in) {
>
>         return "Received " + in + " flume events.";
>
>       }
>
>     }).print();*/
>
>
>
>     flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>
>               @Override
>
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>
>                      String logRecord = null;
>
>                      List<SparkFlumeEvent> events = eventsData.collect();
>
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>
>
>
>
>                      long t1 = System.currentTimeMillis();
>
>                      AvroFlumeEvent avroEvent = null;
>
>                      ByteBuffer bytePayload = null;
>
>
>
>
>                      // All the user level data is carried as payload in
> Flume Event
>
>
>
>                      while(batchedEvents.hasNext()) {
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
>                             avroEvent = flumeEvent.event();
>
>                             bytePayload = avroEvent.getBody();
>
>                             logRecord = new String(bytePayload.array());
>
>
>
>
>                             System.out.println(">>>>>>>>LOG RECORD = " +
> logRecord);
>
>
>
>                            ??I was trying to write the data to hdfs..but
> it fails…
>
>
>
>
>
>
>
>                      }
>
>                      System.out.println("Processed this batch in: " +
> (System.currentTimeMillis() - t1)/1000 + " seconds");
>
>                      return null;
>
>               }
>
>          });
>
>
>
>
>

Reply via email to