[ 
https://issues.apache.org/jira/browse/SPARK-27895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilias Karalis updated SPARK-27895:
----------------------------------
    Description: 
Spark streaming: 2.4.x

Scala: 2.11.11

 

 

foreachRDD of DStream,

in case filter is used on RDD then filter is always refreshing, providing new 
results continuously until new batch is processed. For the new batch, the same 
occurs.

With the same code, if we do rdd.collect() and then run the filter on the 
collection, we get just one time results, which remains stable until new batch 
is coming in.

Filter function is based on random probability (reservoir sampling).

 

{color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> 
chooseX(x) )

 

{color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
 {color:#000080}val {color}r = scala.util.Random
 {color:#000080}val {color}p = r.nextFloat()
 edgeTotalCounter += {color:#0000ff}1{color} {color:#000080}if {color}(p < 
(sampleLength.toFloat / edgeTotalCounter.toFloat)) {
 edgeLocalRDDCounter += {color:#0000ff}1{color} println({color:#008000}"Edge " 
{color}+x + {color:#008000}" has been selected and is number : "{color}+  
edgeLocalRDDCounter +{color:#008000}"."{color})
 {color:#000080}true{color} }
 {color:#000080}else{color} false
 \{color}}

 

edgeLocalRDDCounter counts selected edges from inputRDD.

Strange is that the counter is increased 1st time from 1 to y, then filter 
continues to run unexpectedly again and the counter is increased again starting 
from y+1 to z. After that each time filter unexpectedly continues to run, it 
provides results for which the counter starts from y+1. Each time filter runs 
provides different results and filters different number of edges.

toSampleRDD always changes accordingly to new provided results.

When new batch is coming in then it starts the same behavior for the new batch.

  was:
Spark streaming: 2.4.x

Scala: 2.11.11

 

foreachRDD of DStream,

in case filter is used on RDD then filter is always refreshing, providing new 
results continuously until new batch is processed. For the new batch, the same 
occurs.

With the same code, if we do rdd.collect() and then run the filter on the 
collection, we get just one time results, which remains stable until new batch 
is coming in.

Filter function is based on random probability (reservoir sampling).

 

{color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> 
chooseX(x) )

 

{color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
{color:#808080}
{color} {color:#000080}val {color}r = scala.util.Random
 {color:#000080}val {color}p = r.nextFloat()
 edgeTotalCounter += {color:#0000ff}1
{color} {color:#808080}
{color} {color:#000080}if {color}(p < (sampleLength.toFloat / 
edgeTotalCounter.toFloat)) {
 edgeLocalRDDCounter += {color:#0000ff}1
{color} println({color:#008000}"Edge " {color}+x + {color:#008000}" has been 
selected and is number : " {color}+ edgeLocalRDDCounter 
+{color:#008000}"."{color})
 {color:#000080}true
{color} }
 {color:#000080}else
{color}{color:#000080} false
{color}}

 

edgeLocalRDDCounter counts selected edges from inputRDD.

Strange is that the counter is increased 1st time from 1 to y, then filter 
continues to run unexpectedly again and the counter is increased again starting 
from y+1 to z. After that each time filter unexpectedly continues to run, it 
provides results for which the counter starts from y+1. Each time filter runs 
provides different results and filters different number of edges.

toSampleRDD always changes accordingly to new provided results.

When new batch is coming in then it starts the same behavior for the new batch.


> Spark streaming - RDD filter is always refreshing providing updated filtered 
> items
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-27895
>                 URL: https://issues.apache.org/jira/browse/SPARK-27895
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.0, 2.4.2, 2.4.3
>         Environment: Intellij, running local in windows10 laptop.
>  
>            Reporter: Ilias Karalis
>            Priority: Major
>
> Spark streaming: 2.4.x
> Scala: 2.11.11
>  
>  
> foreachRDD of DStream,
> in case filter is used on RDD then filter is always refreshing, providing new 
> results continuously until new batch is processed. For the new batch, the 
> same occurs.
> With the same code, if we do rdd.collect() and then run the filter on the 
> collection, we get just one time results, which remains stable until new 
> batch is coming in.
> Filter function is based on random probability (reservoir sampling).
>  
> {color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = 
> inputRdd.filter(x=> chooseX(x) )
>  
> {color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
>  {color:#000080}val {color}r = scala.util.Random
>  {color:#000080}val {color}p = r.nextFloat()
>  edgeTotalCounter += {color:#0000ff}1{color} {color:#000080}if {color}(p < 
> (sampleLength.toFloat / edgeTotalCounter.toFloat)) {
>  edgeLocalRDDCounter += {color:#0000ff}1{color} println({color:#008000}"Edge 
> " {color}+x + {color:#008000}" has been selected and is number : "{color}+  
> edgeLocalRDDCounter +{color:#008000}"."{color})
>  {color:#000080}true{color} }
>  {color:#000080}else{color} false
>  \{color}}
>  
> edgeLocalRDDCounter counts selected edges from inputRDD.
> Strange is that the counter is increased 1st time from 1 to y, then filter 
> continues to run unexpectedly again and the counter is increased again 
> starting from y+1 to z. After that each time filter unexpectedly continues to 
> run, it provides results for which the counter starts from y+1. Each time 
> filter runs provides different results and filters different number of edges.
> toSampleRDD always changes accordingly to new provided results.
> When new batch is coming in then it starts the same behavior for the new 
> batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to