[ 
https://issues.apache.org/jira/browse/SPARK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17258909#comment-17258909
 ] 

xiyang commented on SPARK-34013:
--------------------------------

*I perform hmget "game_tag_union" "100004899" "100004450" "100003249" 
"100005973" "100005973"*
*get the result ["62","62","2","158","158"],However, the result you get using 
mapPartitions is(100004899,OK)
(100004450,OK)
(100003249,null)
(100005973,null)
(100005973,null)*
 
{code:java}
entergameStream.foreachRDD(rdd => {
 rdd.map(_.value())
 .map(_.split("\t", -1))
 .filter(_.size >= 18)
 .map(EnterGame(_))
 .mapPartitions(par=>{
 val jedis: Jedis = AdJedisPoolUtils.getJedis
 jedis.select(4)
 val tuples: Iterator[(String, String)] = par.map(data => {
 val tagId: String = jedis.hget("game_tag_union", data.appid)
 (data.appid, tagId)
 })
 jedis.close()
 tuples
 }).foreach(println)



}){code}
 

> Data in redis was obtained using mapPartitions Sparkstreaming, which was not 
> as expected
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-34013
>                 URL: https://issues.apache.org/jira/browse/SPARK-34013
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Structured Streaming
>    Affects Versions: 2.4.2
>            Reporter: xiyang
>            Priority: Major
>
>  
> {code:java}
> rdd.mapPartitions(par => {
>  val jedis: Jedis = AdJedisPoolUtils.getJedis
>  jedis.select(4)
>  val res: Iterator[(((String, String), (String, String, String, String, 
> String, String, String, String, String)), String)] = par.map(data =>{ 
> println(data._2._1) 
> val tagId: String = jedis.hget("game_tag_union", data._2._1) 
> println(tagId) 
> (data, tagId) 
> })
> jedis.close()
>  res
>  })
> {code}
>  
> *The final result was completely different from the one foreachPartition got, 
> but foreachPartition got the correct solution*
> *eg:*
> {code:java}
> rdd.foreachPartition(par=>{
> val jedis: Jedis = AdJedisPoolUtils.getJedis
>  jedis.select(4)
> par.foreach(data=>{ 
> val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId) 
> })
> jedis.close()
> })
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to