This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new baa5f408a098 [SPARK-49945][PS][CONNECT] Add alias for `distributed_id`
baa5f408a098 is described below

commit baa5f408a0985d703b4a1e4c5490c77b239180c4
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Oct 17 11:29:00 2024 +0900

    [SPARK-49945][PS][CONNECT] Add alias for `distributed_id`
    
    ### What changes were proposed in this pull request?
    1, make `registerInternalExpression` support alias;
    2, add alias `distributed_id` for `MonotonicallyIncreasingID` (rename 
`distributed_index` to `distributed_id` to be more consistent with existing 
`distributed_sequence_id`);
    3, remove `distributedIndex` from `PythonSQLUtils`
    
    ### Why are the changes needed?
    make PS on Connect more consistent with Classic:
    
    ```py
    In [9]: ps.set_option("compute.default_index_type", "distributed")
    In [10]: spark_frame = ps.range(10).to_spark()
    In [11]: InternalFrame.attach_default_index(spark_frame).explain(True)
    ```
    
    before:
    
![image](https://github.com/user-attachments/assets/6ce1fb5f-a3c6-42d5-a21e-3925207cb4d0)
    
    ```
    == Parsed Logical Plan ==
    'Project ['monotonically_increasing_id() AS __index_level_0__#27, 'id]
    +- 'Project ['id]
       +- Project [__index_level_0__#19L, id#16L, monotonically_increasing_id() 
AS __natural_order__#22L]
          +- Project [monotonically_increasing_id() AS __index_level_0__#19L, 
id#16L]
             +- Range (0, 10, step=1, splits=Some(12))
    ...
    ```
    
    after:
    
![image](https://github.com/user-attachments/assets/00d3a8a1-251c-4cee-851e-c10f294d5248)
    ```
    == Parsed Logical Plan ==
    'Project ['distributed_id() AS __index_level_0__#65, *]
    +- 'Project ['id]
       +- Project [__index_level_0__#45L, id#42L, monotonically_increasing_id() 
AS __natural_order__#48L]
          +- Project [distributed_id() AS __index_level_0__#45L, id#42L]
             +- Range (0, 10, step=1, splits=Some(12))
    ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    spark ui
    
    ### How was this patch tested?
    existing test and manually check
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #48439 from zhengruifeng/distributed_index.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/internal.py                      |  9 +--------
 python/pyspark/pandas/spark/functions.py               |  4 ++++
 .../spark/sql/catalyst/analysis/FunctionRegistry.scala | 18 +++++++++++++++---
 .../apache/spark/sql/api/python/PythonSQLUtils.scala   |  6 ------
 4 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/pandas/internal.py 
b/python/pyspark/pandas/internal.py
index 6063641e22e3..90c361547b81 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -909,14 +909,7 @@ class InternalFrame:
 
     @staticmethod
     def attach_distributed_column(sdf: PySparkDataFrame, column_name: str) -> 
PySparkDataFrame:
-        scols = [scol_for(sdf, column) for column in sdf.columns]
-        # Does not add an alias to avoid having some changes in protobuf 
definition for now.
-        # The alias is more for query strings in DataFrame.explain, and they 
are cosmetic changes.
-        if is_remote():
-            return 
sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
-        jvm = sdf.sparkSession._jvm
-        jcol = jvm.PythonSQLUtils.distributedIndex()
-        return sdf.select(PySparkColumn(jcol).alias(column_name), *scols)
+        return sdf.select(SF.distributed_id().alias(column_name), "*")
 
     @staticmethod
     def attach_distributed_sequence_column(
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index bdd11559df3b..53146a163b1e 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -79,6 +79,10 @@ def null_index(col: Column) -> Column:
     return _invoke_internal_function_over_columns("null_index", col)
 
 
+def distributed_id() -> Column:
+    return _invoke_internal_function_over_columns("distributed_id")
+
+
 def distributed_sequence_id() -> Column:
     return _invoke_internal_function_over_columns("distributed_sequence_id")
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d03d8114e997..abe61619a233 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -895,9 +895,20 @@ object FunctionRegistry {
   /** Registry for internal functions used by Connect and the Column API. */
   private[sql] val internal: SimpleFunctionRegistry = new 
SimpleFunctionRegistry
 
-  private def registerInternalExpression[T <: Expression : ClassTag](name: 
String): Unit = {
-    val (info, builder) = FunctionRegistryBase.build(name, None)
-    internal.internalRegisterFunction(FunctionIdentifier(name), info, builder)
+  private def registerInternalExpression[T <: Expression : ClassTag](
+      name: String,
+      setAlias: Boolean = false): Unit = {
+    val (info, builder) = FunctionRegistryBase.build[T](name, None)
+    val newBuilder = if (setAlias) {
+      (expressions: Seq[Expression]) => {
+        val expr = builder(expressions)
+        expr.setTagValue(FUNC_ALIAS, name)
+        expr
+      }
+    } else {
+      builder
+    }
+    internal.internalRegisterFunction(FunctionIdentifier(name), info, 
newBuilder)
   }
 
   registerInternalExpression[Product]("product")
@@ -911,6 +922,7 @@ object FunctionRegistry {
   registerInternalExpression[Days]("days")
   registerInternalExpression[Hours]("hours")
   registerInternalExpression[UnwrapUDT]("unwrap_udt")
+  registerInternalExpression[MonotonicallyIncreasingID]("distributed_id", 
setAlias = true)
   registerInternalExpression[DistributedSequenceID]("distributed_sequence_id")
   registerInternalExpression[PandasProduct]("pandas_product")
   registerInternalExpression[PandasStddev]("pandas_stddev")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 08395ef4c347..a66a6e54a7c8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -154,12 +154,6 @@ private[sql] object PythonSQLUtils extends Logging {
 
   def namedArgumentExpression(name: String, e: Column): Column = 
NamedArgumentExpression(name, e)
 
-  def distributedIndex(): Column = {
-    val expr = MonotonicallyIncreasingID()
-    expr.setTagValue(FunctionRegistry.FUNC_ALIAS, "distributed_index")
-    expr
-  }
-
   @scala.annotation.varargs
   def fn(name: String, arguments: Column*): Column = Column.fn(name, 
arguments: _*)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to