[ https://issues.apache.org/jira/browse/SPARK-27895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855547#comment-16855547 ]
Ilias Karalis commented on SPARK-27895: --------------------------------------- The results of RDD filter change all the time until next batch comes in. I am not sure if it has to do with the filter function itself. But this function and filter method work fine , if instead of filtering the RDD, we filter a collection. inputRdd.filter... : results are not stable and change all the time until next batch comes in. inputRdd.collect().filter... : computes results just once and results are stable until next batch comes in. > Spark streaming - RDD filter is always refreshing providing updated filtered > items > ---------------------------------------------------------------------------------- > > Key: SPARK-27895 > URL: https://issues.apache.org/jira/browse/SPARK-27895 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.4.0, 2.4.2, 2.4.3 > Environment: Intellij, running local in windows10 laptop. > > Reporter: Ilias Karalis > Priority: Major > > Spark streaming: 2.4.x > Scala: 2.11.11 > > > foreachRDD of DStream, > in case filter is used on RDD then filter is always refreshing, providing new > results continuously until new batch is processed. For the new batch, the > same occurs. > With the same code, if we do rdd.collect() and then run the filter on the > collection, we get just one time results, which remains stable until new > batch is coming in. > Filter function is based on random probability (reservoir sampling). > > {color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = > inputRdd.filter(x=> chooseX(x) ) > > {color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = { > {color:#000080}val {color}r = scala.util.Random > {color:#000080}val {color}p = r.nextFloat() > edgeTotalCounter += {color:#0000ff}1{color} {color:#000080}if {color}(p < > (sampleLength.toFloat / edgeTotalCounter.toFloat)) { > edgeLocalRDDCounter += {color:#0000ff}1{color} println({color:#008000}"Edge > " {color}+x + {color:#008000}" has been selected and is number : "{color}+ > edgeLocalRDDCounter +{color:#008000}"."{color}) > {color:#000080}true{color} } > {color:#000080}else{color} false > \{color}} > > edgeLocalRDDCounter counts selected edges from inputRDD. > Strange is that the counter is increased 1st time from 1 to y, then filter > continues to run unexpectedly again and the counter is increased again > starting from y+1 to z. After that each time filter unexpectedly continues to > run, it provides results for which the counter starts from y+1. Each time > filter runs provides different results and filters different number of edges. > toSampleRDD always changes accordingly to new provided results. > When new batch is coming in then it starts the same behavior for the new > batch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org