Yep, I had submitted a PR that included it way back in the original direct stream for kafka, but it got nixed in favor of TaskContext.partitionId ;) The concern then was about too many xWithBlah apis on rdd.
If we do want to deprecate taskcontext.partitionId and add foreachPartitionWithIndex, I think that makes sense, I can start a ticket. On Thu, Oct 20, 2016 at 1:16 PM, Reynold Xin <r...@databricks.com> wrote: > Seems like a good new API to add? > > > On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Access to the partition ID is necessary for basically every single one >> of my jobs, and there isn't a foreachPartiionWithIndex equivalent. >> You can kind of work around it with empty foreach after the map, but >> it's really awkward to explain to people. >> >> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin <r...@databricks.com> wrote: >> > FYI - Xiangrui submitted an amazing pull request to fix a long standing >> > issue with a lot of the nondeterministic expressions (rand, randn, >> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567 >> > >> > Prior to this PR, we were using TaskContext.partitionId as the partition >> > index in initializing expressions. However, that is actually not a good >> > index to use in most cases, because it is the physical task's partition >> > id >> > and does not always reflect the partition index at the time the RDD is >> > created (or in the Spark SQL physical plan). This makes a big difference >> > once there is a union or coalesce operation. >> > >> > The "index" given by mapPartitionsWithIndex, on the other hand, does not >> > have this problem because it actually reflects the logical partition >> > index >> > at the time the RDD is created. >> > >> > When is it safe to use TaskContext.partitionId? It is safe at the very >> > end >> > of a query plan (the root node), because there partitionId is guaranteed >> > based on the current implementation to be the same as the physical task >> > partition id. >> > >> > >> > For example, prior to Xiangrui's PR, the following query would return 2 >> > rows, whereas the correct behavior should be 1 entry: >> > >> > >> > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show() >> > >> > The reason it'd return 2 rows is because rand was using >> > TaskContext.partitionId as the per-partition seed, and as a result the >> > two >> > sides of the union are using different seeds. >> > >> > >> > I'm starting to think we should deprecate the API and ban the use of it >> > within the project to be safe ... >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org