[ 
https://issues.apache.org/jira/browse/SPARK-27895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855547#comment-16855547
 ] 

Ilias Karalis commented on SPARK-27895:
---------------------------------------

The results of RDD filter change all the time until next batch comes in.

I am not sure if it has to do with the filter function itself. But this 
function and filter method work fine , if instead of filtering the RDD, we 
filter a collection.

inputRdd.filter... : results are not stable and change all the time until next 
batch comes in.

inputRdd.collect().filter... : computes results just once and results are 
stable until next batch comes in.

 

> Spark streaming - RDD filter is always refreshing providing updated filtered 
> items
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-27895
>                 URL: https://issues.apache.org/jira/browse/SPARK-27895
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.0, 2.4.2, 2.4.3
>         Environment: Intellij, running local in windows10 laptop.
>  
>            Reporter: Ilias Karalis
>            Priority: Major
>
> Spark streaming: 2.4.x
> Scala: 2.11.11
>  
>  
> foreachRDD of DStream,
> in case filter is used on RDD then filter is always refreshing, providing new 
> results continuously until new batch is processed. For the new batch, the 
> same occurs.
> With the same code, if we do rdd.collect() and then run the filter on the 
> collection, we get just one time results, which remains stable until new 
> batch is coming in.
> Filter function is based on random probability (reservoir sampling).
>  
> {color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = 
> inputRdd.filter(x=> chooseX(x) )
>  
> {color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
>  {color:#000080}val {color}r = scala.util.Random
>  {color:#000080}val {color}p = r.nextFloat()
>  edgeTotalCounter += {color:#0000ff}1{color} {color:#000080}if {color}(p < 
> (sampleLength.toFloat / edgeTotalCounter.toFloat)) {
>  edgeLocalRDDCounter += {color:#0000ff}1{color} println({color:#008000}"Edge 
> " {color}+x + {color:#008000}" has been selected and is number : "{color}+  
> edgeLocalRDDCounter +{color:#008000}"."{color})
>  {color:#000080}true{color} }
>  {color:#000080}else{color} false
>  \{color}}
>  
> edgeLocalRDDCounter counts selected edges from inputRDD.
> Strange is that the counter is increased 1st time from 1 to y, then filter 
> continues to run unexpectedly again and the counter is increased again 
> starting from y+1 to z. After that each time filter unexpectedly continues to 
> run, it provides results for which the counter starts from y+1. Each time 
> filter runs provides different results and filters different number of edges.
> toSampleRDD always changes accordingly to new provided results.
> When new batch is coming in then it starts the same behavior for the new 
> batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to