I tried to map SparkFlumeEvents to String of RDDs like below. But that map and
call are not at all executed. I might be doing this in a wrong way. Any help
would be appreciated.
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
Exception {
System.out.println("<<<<<<Inside for
each...call>>>>");
JavaRDD<String> records = eventsData.map(
new Function<SparkFlumeEvent, String>() {
@Override
public String call(SparkFlumeEvent flume) throws Exception {
String logRecord = null;
AvroFlumeEvent avroEvent = null;
ByteBuffer bytePayload = null;
System.out.println("<<<<<<Inside Map..call>>>>");
/* List<SparkFlumeEvent> events = flume.collect();
Iterator<SparkFlumeEvent> batchedEvents =
events.iterator();
SparkFlumeEvent flumeEvent = batchedEvents.next();*/
avroEvent = flume.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());
System.out.println("<<<<Record is" + logRecord);
return logRecord;
}
});
return null;
}
-----Original Message-----
From: Sundaram, Muthu X. [mailto:[email protected]]
Sent: Tuesday, July 22, 2014 10:24 AM
To: [email protected]; [email protected]
Subject: Tranforming flume events using Spark transformation functions
Hi All,
I am getting events from flume using following line.
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host,
port);
Each event is a delimited record. I like to use some of the transformation
functions like map and reduce on this. Do I need to convert the
JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these
function directly on this?
I need to do following kind of operations
XXXX AA
YYYYY Delta
TTTTT AA
CCCC Southwest
XXXX AA
Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1
ticket.
I have to do transformations like this. Right now I am able to receives
records. But I am struggling to transform them using spark transformation
functions since they are not of type JavaRDD<String>.
Can I create new JavaRDD<String>? How do I create new JavaRDD?
I loop through the events like below
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
Exception {
String logRecord = null;
List<SparkFlumeEvent> events = eventsData.collect();
Iterator<SparkFlumeEvent> batchedEvents =
events.iterator();
long t1 = System.currentTimeMillis();
AvroFlumeEvent avroEvent = null;
ByteBuffer bytePayload = null;
// All the user level data is carried as payload in Flume
Event
while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());
System.out.println(">>>>>>>>LOG RECORD = " +
logRecord); }
Where do I create new JavaRDD<String>? DO I do it before this loop? How do I
create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.
I appreciate any help here.
Thanks,
Muthu