[
https://issues.apache.org/jira/browse/SPARK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xiyang updated SPARK-34013:
---------------------------
Description:
{code:java}
rdd.mapPartitions(par => {
val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)
val res: Iterator[(((String, String), (String, String, String, String, String,
String, String, String, String)), String)] = par.map(data =>{
println(data._2._1)
val tagId: String = jedis.hget("game_tag_union", data._2._1)
println(tagId)
(data, tagId)
})
jedis.close()
res
})
{code}
*The final result was completely different from the one foreachPartition got,
but foreachPartition got the correct solution*
*eg:*
{code:java}
rdd.foreachPartition(par=>{
val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)
par.foreach(data=>{
val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId)
})
jedis.close()
})
{code}
was:
rdd.mapPartitions(par => {
val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)
val res: Iterator[(((String, String), (String, String, String, String, String,
String, String, String, String)), String)] = par.map(data => {
println(data._2._1)
val tagId: String = jedis.hget("game_tag_union", data._2._1)
println(tagId)
(data, tagId)
})
jedis.close()
res
})
*The final result was completely different from the one foreachPartition got,
but foreachPartition got the correct solution*
*eg:* rdd.foreachPartition(par=>{
val jedis: Jedis = AdJedisPoolUtils.getJedis
jedis.select(4)
par.foreach(data=>{
val tagId: String = jedis.hget("game_tag_union", data._2._1)
println(tagId)
})
jedis.close()
})
> Data in redis was obtained using mapPartitions Sparkstreaming, which was not
> as expected
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-34013
> URL: https://issues.apache.org/jira/browse/SPARK-34013
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Structured Streaming
> Affects Versions: 2.4.2
> Reporter: xiyang
> Priority: Major
>
>
> {code:java}
> rdd.mapPartitions(par => {
> val jedis: Jedis = AdJedisPoolUtils.getJedis
> jedis.select(4)
> val res: Iterator[(((String, String), (String, String, String, String,
> String, String, String, String, String)), String)] = par.map(data =>{
> println(data._2._1)
> val tagId: String = jedis.hget("game_tag_union", data._2._1)
> println(tagId)
> (data, tagId)
> })
> jedis.close()
> res
> })
> {code}
>
> *The final result was completely different from the one foreachPartition got,
> but foreachPartition got the correct solution*
> *eg:*
> {code:java}
> rdd.foreachPartition(par=>{
> val jedis: Jedis = AdJedisPoolUtils.getJedis
> jedis.select(4)
> par.foreach(data=>{
> val tagId: String = jedis.hget("game_tag_union", data._2._1) println(tagId)
> })
> jedis.close()
> })
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]