This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c3ceae4523cb [SPARK-55366][SQL][PYTHON] Remove 
`errorOnDuplicatedFieldNames` from Python UDFs
c3ceae4523cb is described below

commit c3ceae4523cb5b996b594f7ee811042c3f72037e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Feb 6 07:34:32 2026 +0900

    [SPARK-55366][SQL][PYTHON] Remove `errorOnDuplicatedFieldNames` from Python 
UDFs
    
    ### What changes were proposed in this pull request?
    Remove `errorOnDuplicatedFieldNames` from Python UDFs
    
    ### Why are the changes needed?
    to make the logic more clearer, the `errorOnDuplicatedFieldNames` was 
introduced in 
https://github.com/apache/spark/commit/305aa4a89efe02f517f82039225a99b31b20146f 
for `DataFrame.toPandas`, and it is always `true` in Python UDFs so duplicated 
fields are never allowed in Python UDFs.
    
    Remove this always-true variable to reduce confusion when refactoring UDF 
stuffs.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54153 from zhengruifeng/fail_duplicate_names.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/sql/util/ArrowUtils.scala     | 37 ++++++++++++++++++++++
 .../sql/execution/arrow/ArrowWriterWrapper.scala   |  4 +--
 .../sql/execution/python/ArrowPythonRunner.scala   |  4 +--
 .../execution/python/ArrowPythonUDTFRunner.scala   |  4 +--
 .../python/CoGroupedArrowPythonRunner.scala        |  7 ++--
 .../sql/execution/python/PythonArrowInput.scala    |  8 ++---
 .../ApplyInPandasWithStatePythonRunner.scala       |  3 +-
 .../TransformWithStateInPySparkPythonRunner.scala  |  3 +-
 8 files changed, 54 insertions(+), 16 deletions(-)

diff --git a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
index dd3aad8ef598..e6fa93af64de 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
@@ -324,6 +324,43 @@ private[sql] object ArrowUtils {
     }.asJava)
   }
 
+  /**
+   * Maps schema from Spark to Arrow. NOTE: timeZoneId required for 
TimestampType in StructType
+   */
+  def toArrowSchema(schema: StructType, timeZoneId: String, largeVarTypes: 
Boolean): Schema = {
+    new Schema(schema.map { field =>
+      toArrowField(
+        field.name,
+        field.dataType,
+        field.nullable,
+        timeZoneId,
+        largeVarTypes,
+        field.metadata)
+    }.asJava)
+  }
+
+  /**
+   * Check the schema and fail once a struct type contains duplicated field 
names.
+   */
+  def failDuplicatedFieldNames(dt: DataType): Unit = {
+    dt match {
+      case st: StructType =>
+        if (st.names.toSet.size != st.names.length) {
+          throw ExecutionErrors.duplicatedFieldNameInArrowStructError(
+            st.names.toImmutableArraySeq)
+        }
+        st.fields.foreach { field => failDuplicatedFieldNames(field.dataType) }
+      case arr: ArrayType =>
+        failDuplicatedFieldNames(arr.elementType)
+      case map: MapType =>
+        failDuplicatedFieldNames(map.keyType)
+        failDuplicatedFieldNames(map.valueType)
+      case udt: UserDefinedType[_] =>
+        failDuplicatedFieldNames(udt.sqlType)
+      case _ =>
+    }
+  }
+
   def fromArrowSchema(schema: Schema): StructType = {
     StructType(schema.getFields.asScala.map { field =>
       StructField(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
index c04bae07f67d..0008c1319ec9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriterWrapper.scala
@@ -69,12 +69,10 @@ object ArrowWriterWrapper {
       schema: StructType,
       timeZoneId: String,
       allocatorOwner: String,
-      errorOnDuplicatedFieldNames: Boolean,
       largeVarTypes: Boolean,
       dataOut: DataOutputStream,
       context: TaskContext): ArrowWriterWrapper = {
-    val arrowSchema =
-      ArrowUtils.toArrowSchema(schema, timeZoneId, 
errorOnDuplicatedFieldNames, largeVarTypes)
+    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId, 
largeVarTypes)
     val allocator = ArrowUtils.rootAllocator.newChildAllocator(
       s"stdout writer for $allocatorOwner", 0, Long.MaxValue)
     val root = VectorSchemaRoot.create(arrowSchema, allocator)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index a5536621c531..94354e815ad3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
@@ -42,6 +43,7 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
     funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with PythonArrowInput[IN]
   with PythonArrowOutput[OUT] {
+  ArrowUtils.failDuplicatedFieldNames(schema)
 
   override val envVars: util.Map[String, String] = {
     val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars)
@@ -62,8 +64,6 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
   override val killWorkerOnFlushFailure: Boolean =
     SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
-  override val errorOnDuplicatedFieldNames: Boolean = true
-
   override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
   override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index e5c7be2f4070..818a05cbdd66 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /**
@@ -48,6 +49,7 @@ class ArrowPythonUDTFRunner(
       jobArtifactUUID, pythonMetrics)
   with BatchedPythonArrowInput
   with BasicPythonArrowOutput {
+  ArrowUtils.failDuplicatedFieldNames(schema)
 
   override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
 
@@ -88,8 +90,6 @@ class ArrowPythonUDTFRunner(
   override val killWorkerOnFlushFailure: Boolean =
     SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
-  override val errorOnDuplicatedFieldNames: Boolean = true
-
   override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
   override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index d1fbbdba7131..df108187c9f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.arrow.ArrowWriterWrapper
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /**
@@ -53,6 +54,8 @@ class CoGroupedArrowPythonRunner(
     (Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](
     funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with BasicPythonArrowOutput {
+  ArrowUtils.failDuplicatedFieldNames(leftSchema)
+  ArrowUtils.failDuplicatedFieldNames(rightSchema)
 
   override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
 
@@ -154,7 +157,7 @@ class CoGroupedArrowPythonRunner(
         if (nextBatchInLeftGroup != null) {
           if (leftGroupArrowWriter == null) {
             leftGroupArrowWriter = 
ArrowWriterWrapper.createAndStartArrowWriter(leftSchema,
-              timeZoneId, pythonExec + " (left)", errorOnDuplicatedFieldNames 
= true,
+              timeZoneId, pythonExec + " (left)",
               largeVarTypes, dataOut, context)
             // Set the unloader with compression after creating the writer
             leftGroupArrowWriter.unloader = 
createUnloader(leftGroupArrowWriter.root)
@@ -177,7 +180,7 @@ class CoGroupedArrowPythonRunner(
         } else if (nextBatchInRightGroup != null) {
           if (rightGroupArrowWriter == null) {
             rightGroupArrowWriter = 
ArrowWriterWrapper.createAndStartArrowWriter(rightSchema,
-              timeZoneId, pythonExec + " (right)", errorOnDuplicatedFieldNames 
= true,
+              timeZoneId, pythonExec + " (right)",
               largeVarTypes, dataOut, context)
             // Set the unloader with compression after creating the writer
             rightGroupArrowWriter.unloader = 
createUnloader(rightGroupArrowWriter.root)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
index 58a48b1815e1..2b200294803d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
@@ -47,8 +47,6 @@ private[python] trait PythonArrowInput[IN] { self: 
BasePythonRunner[IN, _] =>
 
   protected def timeZoneId: String
 
-  protected def errorOnDuplicatedFieldNames: Boolean
-
   protected def largeVarTypes: Boolean
 
   protected def pythonMetrics: Map[String, SQLMetric]
@@ -65,8 +63,8 @@ private[python] trait PythonArrowInput[IN] { self: 
BasePythonRunner[IN, _] =>
     ArrowUtils.rootAllocator.newChildAllocator(s"stdout writer for 
$pythonExec", 0, Long.MaxValue)
 
   protected lazy val root: VectorSchemaRoot = {
-    val arrowSchema = ArrowUtils.toArrowSchema(
-      schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes)
+    ArrowUtils.failDuplicatedFieldNames(schema)
+    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId, 
largeVarTypes)
     VectorSchemaRoot.create(arrowSchema, allocator)
   }
 
@@ -288,7 +286,7 @@ private[python] trait GroupedPythonArrowInput { self: 
RowInputArrowPythonRunner
             assert(writer == null || writer.isClosed)
             writer = ArrowWriterWrapper.createAndStartArrowWriter(
               schema, timeZoneId, pythonExec,
-              errorOnDuplicatedFieldNames, largeVarTypes, dataOut, context)
+              largeVarTypes, dataOut, context)
             // Set the unloader with compression after creating the writer
             writer.unloader = new VectorUnloader(writer.root, true, 
self.codec, true)
             nextBatchStart = inputIterator.next()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 89d8e425fd2b..786c5fc408dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStateWri
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.GroupStateImpl
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
 
 
@@ -69,6 +70,7 @@ class ApplyInPandasWithStatePythonRunner(
     funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with PythonArrowInput[InType]
   with PythonArrowOutput[OutType] {
+  ArrowUtils.failDuplicatedFieldNames(inputSchema)
 
   override val pythonExec: String =
     SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
@@ -85,7 +87,6 @@ class ApplyInPandasWithStatePythonRunner(
   private val sqlConf = SQLConf.get
 
   override val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
-  override val errorOnDuplicatedFieldNames: Boolean = true
 
   override val hideTraceback: Boolean = sqlConf.pysparkHideTraceback
   override val simplifiedTraceback: Boolean = 
sqlConf.pysparkSimplifiedTraceback
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
index 05771d38cd84..33fe0cdfee3f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
@@ -38,6 +38,7 @@ import 
org.apache.spark.sql.execution.python.streaming.TransformWithStateInPySpa
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.{DriverStatefulProcessorHandleImpl,
 StatefulProcessorHandleImpl}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 
@@ -233,6 +234,7 @@ abstract class 
TransformWithStateInPySparkPythonBaseRunner[I](
   with BasicPythonArrowOutput
   with TransformWithStateInPySparkPythonRunnerUtils
   with Logging {
+  ArrowUtils.failDuplicatedFieldNames(schema)
 
   protected val sqlConf = SQLConf.get
   protected val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch
@@ -251,7 +253,6 @@ abstract class 
TransformWithStateInPySparkPythonBaseRunner[I](
         (if (isUnixDomainSock) stateServerSocketPath else 
stateServerSocketPort.toString)
     )
 
-  override protected val errorOnDuplicatedFieldNames: Boolean = true
   override protected val largeVarTypes: Boolean = sqlConf.arrowUseLargeVarTypes
 
   override def compute(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to