[
https://issues.apache.org/jira/browse/SPARK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17258909#comment-17258909
]
xiyang commented on SPARK-34013:
--------------------------------
*I perform hmget "game_tag_union" "100004899" "100004450" "100003249"
"100005973" "100005973"*
*get the result ["62","62","2","158","158"],However, the result you get using
mapPartitions is(100004899,OK)
(100004450,OK)
(100003249,null)
(100005973,null)
(100005973,null)*
{code:java}
entergameStream.foreachRDD(rdd => {
rdd.map(_.value())
.map(_.split("\t", -1))
.filter(_.size >= 18)
.map(EnterGame(_))
.mapPartitions(par=>{
val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)
val tuples: Iterator[(String, String)] = par.map(data => {
val tagId: String = jedis.hget("game_tag_union", data.appid)
(data.appid, tagId)
})
jedis.close()
tuples
}).foreach(println)
}){code}
> Data in redis was obtained using mapPartitions Sparkstreaming, which was not
> as expected
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-34013
> URL: https://issues.apache.org/jira/browse/SPARK-34013
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Structured Streaming
> Affects Versions: 2.4.2
> Reporter: xiyang
> Priority: Major
>
>
> {code:java}
> rdd.mapPartitions(par => {
> val jedis: Jedis = AdJedisPoolUtils.getJedis
> jedis.select(4)
> val res: Iterator[(((String, String), (String, String, String, String,
> String, String, String, String, String)), String)] = par.map(data =>{
> println(data._2._1)
> val tagId: String = jedis.hget("game_tag_union", data._2._1)
> println(tagId)
> (data, tagId)
> })
> jedis.close()
> res
> })
> {code}
>
> *The final result was completely different from the one foreachPartition got,
> but foreachPartition got the correct solution*
> *eg:*
> {code:java}
> rdd.foreachPartition(par=>{
> val jedis: Jedis = AdJedisPoolUtils.getJedis
> jedis.select(4)
> par.foreach(data=>{
> val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId)
> })
> jedis.close()
> })
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]