[ 
https://issues.apache.org/jira/browse/SPARK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiyang updated SPARK-34013:
---------------------------
    Description: 
 
{code:java}
rdd.mapPartitions(par => {
 val jedis: Jedis = AdJedisPoolUtils.getJedis
 jedis.select(4)
 val res: Iterator[(((String, String), (String, String, String, String, String, 
String, String, String, String)), String)] = par.map(data =>{ 
println(data._2._1) 
val tagId: String = jedis.hget("game_tag_union", data._2._1) 
println(tagId) 
(data, tagId) 
})
jedis.close()
 res
 })
{code}
 

*The final result was completely different from the one foreachPartition got, 
but foreachPartition got the correct solution*

*eg:*
{code:java}
rdd.foreachPartition(par=>{
val jedis: Jedis = AdJedisPoolUtils.getJedis
 jedis.select(4)
par.foreach(data=>{ 
val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId) 
})
jedis.close()
})
{code}
 

 

  was:
rdd.mapPartitions(par => {
 val jedis: Jedis = AdJedisPoolUtils.getJedis
 jedis.select(4)
 val res: Iterator[(((String, String), (String, String, String, String, String, 
String, String, String, String)), String)] = par.map(data => {
 println(data._2._1)
 val tagId: String = jedis.hget("game_tag_union", data._2._1)
 println(tagId)
 (data, tagId)
 })
 jedis.close()
 res
})

*The final result was completely different from the one foreachPartition got, 
but foreachPartition got the correct solution*

*eg:* rdd.foreachPartition(par=>{

val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)

par.foreach(data=>{

val tagId: String = jedis.hget("game_tag_union", data._2._1)

println(tagId)

})

jedis.close()

})

 


> Data in redis was obtained using mapPartitions Sparkstreaming, which was not 
> as expected
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-34013
>                 URL: https://issues.apache.org/jira/browse/SPARK-34013
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Structured Streaming
>    Affects Versions: 2.4.2
>            Reporter: xiyang
>            Priority: Major
>
>  
> {code:java}
> rdd.mapPartitions(par => {
>  val jedis: Jedis = AdJedisPoolUtils.getJedis
>  jedis.select(4)
>  val res: Iterator[(((String, String), (String, String, String, String, 
> String, String, String, String, String)), String)] = par.map(data =>{ 
> println(data._2._1) 
> val tagId: String = jedis.hget("game_tag_union", data._2._1) 
> println(tagId) 
> (data, tagId) 
> })
> jedis.close()
>  res
>  })
> {code}
>  
> *The final result was completely different from the one foreachPartition got, 
> but foreachPartition got the correct solution*
> *eg:*
> {code:java}
> rdd.foreachPartition(par=>{
> val jedis: Jedis = AdJedisPoolUtils.getJedis
>  jedis.select(4)
> par.foreach(data=>{ 
> val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId) 
> })
> jedis.close()
> })
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to