If you cannot push data as fast as you are generating it, then async isnt
going to help either. The "work" is just going to keep piling up as many
many async jobs even though your batch processing times will be low as that
processing time is not going to reflect how much of overall work is pending
in the system.

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com> wrote:

> Hi,
>
> From my understanding of Spark Streaming, I created a spark entry point,
> for continuous UDP data, using:
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>  jssc = new JavaStreamingContext(conf, new 
> Duration(10000));JavaReceiverInputDStream<String> lines = 
> jssc.receiverStream(new CustomReceiver(8060));
>
> Now, when I process this input stream using:
>
> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= 
> hash.mapToPair(<my-code>)JavaPairDStream output= tuple.reduceByKey(<my-code>)
> output.foreachRDD(
>                 new 
> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
>                     @Override
>                     public Void call(
>                             JavaPairRDD<String, ArrayList<String>> arg0,
>                             Time arg1) throws Exception {
>                         // TODO Auto-generated method stub
>                         new AsyncRDDActions(arg0.rdd(), null);
>                         arg0.foreachPartition(
>                                 new 
> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){
>
>                                     @Override
>                                     public void call(
>                                             Iterator<Tuple2<String, 
> ArrayList<String>>> arg0)
>                                             throws Exception {
>
>                                         // TODO Auto-generated method stub
>                                         GraphDatabaseService graphDb = new 
> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>                                                 
> .setConfig("remote_shell_enabled", "true")
>                                                 .newGraphDatabase();
>
>                                         try (Transaction tx = 
> graphDb.beginTx()) {
>                                             while (arg0.hasNext()) {
>                                                 Tuple2 < String, ArrayList < 
> String >> tuple = arg0.next();
>                                                 Node 
> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>                                                 boolean oldHMac=false;
>                                                 if (HMac!= null){
>                                                     
> System.out.println("Alread in Database:" + tuple._1);
>                                                     oldHMac=true;
>                                                 }
>                                                 else
>                                                     
> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>
>                                                 ArrayList<String> 
> zipcodes=tuple._2;
>                                                 for(String zipcode : 
> zipcodes){
>                                                     Node 
> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>                                                     if(Zipcode!=null){
>                                                         
> System.out.println("Already in Database:" + zipcode);
>                                                         if(oldHMac==true && 
> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>                                                             
> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>                                                         else
>                                                             
> Neo4jOperations.travelTo(HMac, Zipcode);
>                                                     }
>                                                     else{
>                                                         
> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>                                                         
> Neo4jOperations.travelTo(HMac, Zipcode);
>                                                     }
>                                                 }
>                                             }
>                                             tx.success();
>                                         }
>                                         graphDb.shutdown();
>                                     }
>                         });
>                         return null;
>                     }
>                 });
>
> The part of code in output.foreachRDD pushes the output of spark into
> Neo4j Database. Checking for duplicates values.
>
> This part of code is very time consuming because of which my processing
> time exceeds batch time. Because of that, it *result in dataloss*. So, I
> was thinking of pushing the output into the database asynchronously.
> I found AsyncRDDActions(
> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
> for this purpose, but cannot find a working example for that in Java.
> Especially, the function foreachPatitionAsync inside which we have to use
> "Function1"
>
> Any help is appreciated.
>
> Thanks,
> Gautam
>

Reply via email to