This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 3328615 [SPARK-38654][SQL][PYTHON] Show default index type in SQL
plans for pandas API on Spark
3328615 is described below
commit 332861569d09d404da48b63846c0fa5920da0a6e
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Mar 25 22:00:48 2022 +0900
[SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas
API on Spark
### What changes were proposed in this pull request?
This PR proposes to show the default index type in SQL plans for pandas API
on Spark.
Note that this PR does not handle `sequence` case because that's
discouraged in production, and tricky to insert an alias.
### Why are the changes needed?
When users set `compute.default_index_type`, it's difficult to know which
DataFrame users which index. We should at least note that in Spark SQL so users
can tell which plans are for default index.
### Does this PR introduce _any_ user-facing change?
Yes, when users call `pyspark.pandas.DataFrame.spark.explain(True)`:
**distributed**
```python
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed")
ps.range(1).spark.explain()
```
```
== Physical Plan ==
*(1) Project [distributed_index() AS __index_level_0__#15L, id#13L]
+- *(1) Range (0, 1, step=1, splits=16)
```
**distributed-sequence**
```python
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed-sequence")
ps.range(1).spark.explain()
```
```
== Physical Plan ==
AttachDistributedSequence[__index_level_0__#16L, id#13L] Index:
__index_level_0__#16L
+- *(1) Range (0, 1, step=1, splits=16)
```
### How was this patch tested?
Manually tested.
Closes #35968 from HyukjinKwon/SPARK-38654.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8ef0159550c143e07fa79b120b2d1fdf9d535fdc)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/internal.py | 6 +++++-
.../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +-
.../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala | 7 +++++++
.../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++++++
4 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/pandas/internal.py
b/python/pyspark/pandas/internal.py
index f79f0ad..b2e6749 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -887,7 +887,11 @@ class InternalFrame:
@staticmethod
def attach_distributed_column(sdf: SparkDataFrame, column_name: str) ->
SparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
- return sdf.select(F.monotonically_increasing_id().alias(column_name),
*scols)
+ jvm = sdf.sparkSession._jvm
+ tag =
jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS()
+ jexpr = F.monotonically_increasing_id()._jc.expr()
+ jexpr.setTagValue(tag, "distributed_index")
+ return sdf.select(Column(jvm.Column(jexpr)).alias(column_name), *scols)
@staticmethod
def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name:
str) -> SparkDataFrame:
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index f228b36..ecf254f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -85,7 +85,7 @@ case class MonotonicallyIncreasingID() extends LeafExpression
with Stateful {
$countTerm++;""", isNull = FalseLiteral)
}
- override def prettyName: String = "monotonically_increasing_id"
+ override def nodeName: String = "monotonically_increasing_id"
override def sql: String = s"$prettyName()"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 13a40db..c2f74b3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, PythonUDF}
+import org.apache.spark.sql.catalyst.util.truncatedString
/**
* FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
@@ -146,4 +147,10 @@ case class AttachDistributedSequence(
override protected def withNewChildInternal(newChild: LogicalPlan):
AttachDistributedSequence =
copy(child = newChild)
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val indexColumn = s"Index: $sequenceAttr"
+ s"$nodeName$truncatedOutputString $indexColumn"
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
index 27bfb7f..203fb6d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
/**
@@ -59,4 +60,10 @@ case class AttachDistributedSequenceExec(
override protected def withNewChildInternal(newChild: SparkPlan):
AttachDistributedSequenceExec =
copy(child = newChild)
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val indexColumn = s"Index: $sequenceAttr"
+ s"$nodeName$truncatedOutputString $indexColumn"
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]