This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3328615  [SPARK-38654][SQL][PYTHON] Show default index type in SQL 
plans for pandas API on Spark
3328615 is described below

commit 332861569d09d404da48b63846c0fa5920da0a6e
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Mar 25 22:00:48 2022 +0900

    [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas 
API on Spark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to show the default index type in SQL plans for pandas API 
on Spark.
    
    Note that this PR does not handle `sequence` case because that's 
discouraged in production, and tricky to insert an alias.
    
    ### Why are the changes needed?
    
    When users set `compute.default_index_type`, it's difficult to know which 
DataFrame users which index. We should at least note that in Spark SQL so users 
can tell which plans are for default index.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, when users call `pyspark.pandas.DataFrame.spark.explain(True)`:
    
    **distributed**
    
    ```python
    import pyspark.pandas as ps
    ps.set_option("compute.default_index_type", "distributed")
    ps.range(1).spark.explain()
    ```
    
    ```
    == Physical Plan ==
    *(1) Project [distributed_index() AS __index_level_0__#15L, id#13L]
    +- *(1) Range (0, 1, step=1, splits=16)
    ```
    
    **distributed-sequence**
    
    ```python
    import pyspark.pandas as ps
    ps.set_option("compute.default_index_type", "distributed-sequence")
    ps.range(1).spark.explain()
    ```
    
    ```
    == Physical Plan ==
    AttachDistributedSequence[__index_level_0__#16L, id#13L] Index: 
__index_level_0__#16L
    +- *(1) Range (0, 1, step=1, splits=16)
    ```
    
    ### How was this patch tested?
    
    Manually tested.
    
    Closes #35968 from HyukjinKwon/SPARK-38654.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 8ef0159550c143e07fa79b120b2d1fdf9d535fdc)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/internal.py                                  | 6 +++++-
 .../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +-
 .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala  | 7 +++++++
 .../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++++++
 4 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/pandas/internal.py 
b/python/pyspark/pandas/internal.py
index f79f0ad..b2e6749 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -887,7 +887,11 @@ class InternalFrame:
     @staticmethod
     def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> 
SparkDataFrame:
         scols = [scol_for(sdf, column) for column in sdf.columns]
-        return sdf.select(F.monotonically_increasing_id().alias(column_name), 
*scols)
+        jvm = sdf.sparkSession._jvm
+        tag = 
jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS()
+        jexpr = F.monotonically_increasing_id()._jc.expr()
+        jexpr.setTagValue(tag, "distributed_index")
+        return sdf.select(Column(jvm.Column(jexpr)).alias(column_name), *scols)
 
     @staticmethod
     def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: 
str) -> SparkDataFrame:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index f228b36..ecf254f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -85,7 +85,7 @@ case class MonotonicallyIncreasingID() extends LeafExpression 
with Stateful {
       $countTerm++;""", isNull = FalseLiteral)
   }
 
-  override def prettyName: String = "monotonically_increasing_id"
+  override def nodeName: String = "monotonically_increasing_id"
 
   override def sql: String = s"$prettyName()"
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 13a40db..c2f74b3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, PythonUDF}
+import org.apache.spark.sql.catalyst.util.truncatedString
 
 /**
  * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
@@ -146,4 +147,10 @@ case class AttachDistributedSequence(
 
   override protected def withNewChildInternal(newChild: LogicalPlan): 
AttachDistributedSequence =
     copy(child = newChild)
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val indexColumn = s"Index: $sequenceAttr"
+    s"$nodeName$truncatedOutputString $indexColumn"
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
index 27bfb7f..203fb6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 
 /**
@@ -59,4 +60,10 @@ case class AttachDistributedSequenceExec(
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
AttachDistributedSequenceExec =
     copy(child = newChild)
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val indexColumn = s"Index: $sequenceAttr"
+    s"$nodeName$truncatedOutputString $indexColumn"
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to