Something does not make sense. Receivers (currently) does not get blocked
(unless rate limit has been set) due to processing load. The receiver will
continue to receive data and store it in memory and until it is processed.
So I am still not sure how the data loss is happening. Unless you are
sending data at a faster rate than the receiver can handle (that more than
the max rate the receiver can save data in memory and replicate to other
nodes).

In general, if you are particular about data loss, then UDP is not really a
good choice in the first place. If you can try using TCP, try it. It would
at least eliminate the possibility that I mentioned above. Ultimately if
you try sending data faster that the receiver can handle (independent of
whether processing can handle), then you will loose data if you are using
UDP. You have to use TCP to naturally control the sending rate to match the
receiving rate in the receiver, without dropping data.


On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj <gautam1...@gmail.com> wrote:

> This is just a friendly ping, just to remind you of my query.
>
> Also, is there a possible explanation/example on the usage of
> AsyncRDDActions in Java ?
>
> On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj <gautam1...@gmail.com>
> wrote:
>
>> I am received data at UDP port 8060 and doing processing on it using
>> Spark and storing the output in Neo4j.
>>
>> But the data I'm receiving and the data that is getting stored doesn't
>> match probably because Neo4j API takes too long to push the data into
>> database. Meanwhile, Spark is unable to receive data probably because the
>> process is blocked.
>>
>> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Can you elaborate on how the data loss is occurring?
>>>
>>>
>>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com>
>>> wrote:
>>>
>>>> That is completely alright, as the system will make sure the works get
>>>> done.
>>>>
>>>> My major concern is, the data drop. Will using async stop data loss?
>>>>
>>>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> If you cannot push data as fast as you are generating it, then async
>>>>> isnt going to help either. The "work" is just going to keep piling up as
>>>>> many many async jobs even though your batch processing times will be low 
>>>>> as
>>>>> that processing time is not going to reflect how much of overall work is
>>>>> pending in the system.
>>>>>
>>>>> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> From my understanding of Spark Streaming, I created a spark entry
>>>>>> point, for continuous UDP data, using:
>>>>>>
>>>>>> SparkConf conf = new 
>>>>>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>>>>>  jssc = new JavaStreamingContext(conf, new 
>>>>>> Duration(10000));JavaReceiverInputDStream<String> lines = 
>>>>>> jssc.receiverStream(new CustomReceiver(8060));
>>>>>>
>>>>>> Now, when I process this input stream using:
>>>>>>
>>>>>> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= 
>>>>>> hash.mapToPair(<my-code>)JavaPairDStream output= 
>>>>>> tuple.reduceByKey(<my-code>)
>>>>>> output.foreachRDD(
>>>>>>                 new 
>>>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
>>>>>>                     @Override
>>>>>>                     public Void call(
>>>>>>                             JavaPairRDD<String, ArrayList<String>> arg0,
>>>>>>                             Time arg1) throws Exception {
>>>>>>                         // TODO Auto-generated method stub
>>>>>>                         new AsyncRDDActions(arg0.rdd(), null);
>>>>>>                         arg0.foreachPartition(
>>>>>>                                 new 
>>>>>> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){
>>>>>>
>>>>>>                                     @Override
>>>>>>                                     public void call(
>>>>>>                                             Iterator<Tuple2<String, 
>>>>>> ArrayList<String>>> arg0)
>>>>>>                                             throws Exception {
>>>>>>
>>>>>>                                         // TODO Auto-generated method 
>>>>>> stub
>>>>>>                                         GraphDatabaseService graphDb = 
>>>>>> new 
>>>>>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>>>>>>                                                 
>>>>>> .setConfig("remote_shell_enabled", "true")
>>>>>>                                                 .newGraphDatabase();
>>>>>>
>>>>>>                                         try (Transaction tx = 
>>>>>> graphDb.beginTx()) {
>>>>>>                                             while (arg0.hasNext()) {
>>>>>>                                                 Tuple2 < String, 
>>>>>> ArrayList < String >> tuple = arg0.next();
>>>>>>                                                 Node 
>>>>>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>>>>>>                                                 boolean oldHMac=false;
>>>>>>                                                 if (HMac!= null){
>>>>>>                                                     
>>>>>> System.out.println("Alread in Database:" + tuple._1);
>>>>>>                                                     oldHMac=true;
>>>>>>                                                 }
>>>>>>                                                 else
>>>>>>                                                     
>>>>>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>>>>>>
>>>>>>                                                 ArrayList<String> 
>>>>>> zipcodes=tuple._2;
>>>>>>                                                 for(String zipcode : 
>>>>>> zipcodes){
>>>>>>                                                     Node 
>>>>>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>>>>>>                                                     if(Zipcode!=null){
>>>>>>                                                         
>>>>>> System.out.println("Already in Database:" + zipcode);
>>>>>>                                                         if(oldHMac==true 
>>>>>> && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>>>>>>                                                             
>>>>>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>>>>>>                                                         else
>>>>>>                                                             
>>>>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>>>>                                                     }
>>>>>>                                                     else{
>>>>>>                                                         
>>>>>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>>>>>>                                                         
>>>>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>>>>                                                     }
>>>>>>                                                 }
>>>>>>                                             }
>>>>>>                                             tx.success();
>>>>>>                                         }
>>>>>>                                         graphDb.shutdown();
>>>>>>                                     }
>>>>>>                         });
>>>>>>                         return null;
>>>>>>                     }
>>>>>>                 });
>>>>>>
>>>>>> The part of code in output.foreachRDD pushes the output of spark into
>>>>>> Neo4j Database. Checking for duplicates values.
>>>>>>
>>>>>> This part of code is very time consuming because of which my
>>>>>> processing time exceeds batch time. Because of that, it *result in
>>>>>> dataloss*. So, I was thinking of pushing the output into the
>>>>>> database asynchronously.
>>>>>> I found AsyncRDDActions(
>>>>>> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
>>>>>> for this purpose, but cannot find a working example for that in Java.
>>>>>> Especially, the function foreachPatitionAsync inside which we have to use
>>>>>> "Function1"
>>>>>>
>>>>>> Any help is appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>> Gautam
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>
>
> --
> Gautam
>

Reply via email to