This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 34d7a3c5ccf6 [SPARK-52975][SQL] Simplify field names in pushdown join 
sql
34d7a3c5ccf6 is described below

commit 34d7a3c5ccf6a4d4ca714297fb6cf7d36ed2463f
Author: dengziming <dengzim...@bytedance.com>
AuthorDate: Mon Aug 4 13:28:57 2025 +0800

    [SPARK-52975][SQL] Simplify field names in pushdown join sql
    
    ### What changes were proposed in this pull request?
    When pushing down join SQL, we generated aliases for duplicated names, but 
the aliases are too long to read and nondeterministic.
    
    Before this change:
    ```
    SELECT 
"ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0","AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a","ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d","ID","AMOUNT","ADDRESS"
 FROM xxxx
    
    RelationV2[ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0#18, 
AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a#19, 
ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d#20, ID#21, AMOUNT#22, ADDRESS#23] 
join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
    ```
    
    After this change.
    ```
    SELECT "ID","AMOUNT","ADDRESS","ID_1","AMOUNT_1","ADDRESS_1" FROM xxx
    
    RelationV2[ID#18, AMOUNT#19, ADDRESS#20, ID_1#21, AMOUNT_1#22, 
ADDRESS_1#23] join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
    ```
    
    ### Why are the changes needed?
    Make code-generated JDBC SQL clearer and deterministic.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests can ensure no side effects are introduced.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Trae.
    
    Closes #51686 from dengziming/SPARK-52975.
    
    Authored-by: dengziming <dengzim...@bytedance.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../join/MySQLJoinPushdownIntegrationSuite.scala   |   2 +
 .../datasources/v2/V2ScanRelationPushDown.scala    | 118 ++++++++++------
 .../connector/DataSourcePushdownTestUtils.scala    |   4 +-
 .../v2/DSV2JoinPushDownAliasGenerationSuite.scala  | 152 +++++++++++++++++++++
 .../JDBCV2JoinPushdownIntegrationSuiteBase.scala   |  96 ++++++++++---
 5 files changed, 314 insertions(+), 58 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala
index f6d70dc8ae16..1a1536b6a292 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala
@@ -43,6 +43,8 @@ class MySQLJoinPushdownIntegrationSuite
 
   override def caseConvert(identifier: String): String = 
identifier.toUpperCase(Locale.ROOT)
 
+  override def remainColumnCase(identifier: String): String = "`" + identifier 
+ "`"
+
   // This method comes from DockerJDBCIntegrationSuite
   override def dataPreparation(connection: Connection): Unit = {
     super.dataPreparation()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 0e3141aacbdf..078acb72ce07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.Locale
+
 import scala.collection.mutable
 
 import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, 
GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, 
RELATION_NAME, RELATION_OUTPUT}
@@ -137,42 +139,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
         // Cross joins are not supported because they increase the amount of 
data.
         condition.isDefined &&
         lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
-      val leftSideRequiredColumnNames = 
getRequiredColumnNames(leftProjections, leftHolder)
-      val rightSideRequiredColumnNames = 
getRequiredColumnNames(rightProjections, rightHolder)
-
-      // Alias the duplicated columns from left side of the join. We are 
creating the
-      // Map[String, Int] to tell how many times each column name has occured 
within one side.
-      val leftSideNameCounts: Map[String, Int] =
-        
leftSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap
-      val rightSideNameCounts: Map[String, Int] =
-        
rightSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap
-      // It's more performant to call contains on Set than on Seq
-      val rightSideColumnNamesSet = rightSideRequiredColumnNames.toSet
-
-      val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map 
{ name =>
-        val aliasName =
-          if (leftSideNameCounts(name) > 1 || 
rightSideColumnNamesSet.contains(name)) {
-            generateJoinOutputAlias(name)
-          } else {
-            null
-          }
-
-        new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
-      }
-
-      // Aliasing of duplicated columns in right side is done only if there 
are duplicates in
-      // right side only. There won't be a conflict with left side columns 
because they are
-      // already aliased.
-      val rightSideRequiredColumnsWithAliases = 
rightSideRequiredColumnNames.map { name =>
-        val aliasName =
-          if (rightSideNameCounts(name) > 1) {
-            generateJoinOutputAlias(name)
-          } else {
-            null
-          }
-
-        new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
-      }
+      // Process left and right columns in original order
+      val (leftSideRequiredColumnsWithAliases, 
rightSideRequiredColumnsWithAliases) =
+        generateColumnAliasesForDuplicatedName(
+          getRequiredColumnNames(leftProjections, leftHolder),
+          getRequiredColumnNames(rightProjections, rightHolder))
 
       // Create the AttributeMap that holds (Attribute -> Attribute with up to 
date name) mapping.
       val pushedJoinOutputMap = AttributeMap[Expression](
@@ -249,11 +220,80 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
         node
       }
   }
+  /**
+   * Generates unique column aliases for join operations to avoid naming 
conflicts.
+   * Handles case sensitivity issues across different databases (SQL Server, 
MySQL, etc.).
+   *
+   * @param leftSideRequiredColumnNames  Columns from the left side of the join
+   * @param rightSideRequiredColumnNames Columns from the right side of the 
join
+   * @return Tuple of (leftColumnsWithAliases, rightColumnsWithAliases)
+   */
+  private[v2] def generateColumnAliasesForDuplicatedName(
+    leftSideRequiredColumnNames: Array[String],
+    rightSideRequiredColumnNames: Array[String]
+  ): (Array[SupportsPushDownJoin.ColumnWithAlias],
+    Array[SupportsPushDownJoin.ColumnWithAlias]) = {
+    // Normalize all column names to lowercase for case-insensitive comparison
+    val normalizeCase: String => String = _.toLowerCase(Locale.ROOT)
+
+    // Count occurrences of each column name (case-insensitive)
+    val allRequiredColumnNames = leftSideRequiredColumnNames ++ 
rightSideRequiredColumnNames
+    val allNameCounts: Map[String, Int] =
+      allRequiredColumnNames.map(normalizeCase)
+        .groupBy(identity)
+        .view
+        .mapValues(_.length)
+        .toMap
+
+    // Track claimed aliases using normalized names.
+    // Use Set for O(1) lookups when checking existing column names, claim all 
names
+    // that appears only once to ensure they have highest priority.
+    val allClaimedAliases = mutable.Set.from(
+      allNameCounts.filter(_._2 == 1).keys
+    )
+
+    // Track suffix index for each base column name (starts at 0) to avoid 
extreme worst
+    // case of O(n^2) alias generation.
+    val aliasSuffixIndex = mutable.HashMap[String, Int]().withDefaultValue(0)
+
+    def processColumn(originalName: String): 
SupportsPushDownJoin.ColumnWithAlias = {
+      val normalizedName = normalizeCase(originalName)
+
+      // No alias needed for unique column names
+      if (allNameCounts(normalizedName) == 1) {
+        new SupportsPushDownJoin.ColumnWithAlias(originalName, null)
+      } else {
+        var attempt = aliasSuffixIndex(normalizedName)
+        var candidate = if (attempt == 0) originalName else 
s"${originalName}_$attempt"
+        var normalizedCandidate = normalizeCase(candidate)
+
+        // Find first available unique alias, use original name for the first 
attempt, then append
+        // suffix for more attempts.
+        while (allClaimedAliases.contains(normalizedCandidate)) {
+          attempt += 1
+          candidate = s"${originalName}_$attempt"
+          normalizedCandidate = normalizeCase(candidate)
+        }
+
+        // Update tracking state
+        aliasSuffixIndex(normalizedName) = attempt + 1
+        allClaimedAliases.add(normalizedCandidate)
 
-  def generateJoinOutputAlias(name: String): String =
-    s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+        if (originalName == candidate) {
+          new SupportsPushDownJoin.ColumnWithAlias(originalName, null)
+        } else {
+          new SupportsPushDownJoin.ColumnWithAlias(originalName, candidate)
+        }
+      }
+    }
+
+    (
+      leftSideRequiredColumnNames.map(processColumn),
+      rightSideRequiredColumnNames.map(processColumn)
+    )
+  }
 
-  // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+  // Projections' names are maybe not up to date if the joins have been 
previously pushed down.
   // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
   def getRequiredColumnNames(
       projections: Seq[NamedExpression],
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
index 7b8774980d2c..ee2bcd34bf3f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
@@ -182,9 +182,7 @@ trait DataSourcePushdownTestUtils extends 
ExplainSuiteHelper {
 
           assert(dfSchema.length == schema.length)
           dfSchema.fields.zip(schema.fields).foreach { case (f1, f2) =>
-            if (f2.name.nonEmpty) {
-              assert(f1.name == f2.name)
-            }
+            assert(f1.name == f2.name)
             assert(f1.dataType == f2.dataType)
             assert(f1.nullable == f2.nullable)
           }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala
new file mode 100644
index 000000000000..a30f65d981a3
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util.Locale
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connector.read.SupportsPushDownJoin.ColumnWithAlias
+
+class DSV2JoinPushDownAliasGenerationSuite extends SparkFunSuite {
+
+  private def assertAliases(
+    leftInput: Array[String],
+    rightInput: Array[String],
+    expectedLeft: Array[ColumnWithAlias],
+    expectedRight: Array[ColumnWithAlias]
+  ): Unit = {
+    val (actualLeft, actualRight) = V2ScanRelationPushDown
+      .generateColumnAliasesForDuplicatedName(leftInput, rightInput)
+
+    val uniqName: ColumnWithAlias => String = col => {
+      if (col.alias() == null) col.colName() else 
col.alias().toLowerCase(Locale.ROOT)
+    }
+    // Ensure no duplicate column names after ignoring capitalization
+    assert((actualLeft ++ actualRight).map(uniqName).distinct.length
+      == actualLeft.length + actualRight.length)
+
+    assert(
+      actualLeft === expectedLeft,
+      s"""Left side aliases mismatch.
+         |Expected: ${expectedLeft.map(_.alias()).mkString(", ")}
+         |Actual: ${actualLeft.map(_.alias()).mkString(", ")}""".stripMargin
+    )
+
+    assert(
+      actualRight === expectedRight,
+      s"""Right side aliases mismatch.
+         |Expected: ${expectedRight.map(_.alias()).mkString(", ")}
+         |Actual: ${actualRight.map(_.alias()).mkString(", ")}""".stripMargin
+    )
+  }
+
+  test("Basic case with no duplicate column names") {
+    assertAliases(
+      leftInput = Array("id", "name"),
+      rightInput = Array("email", "phone"),
+      expectedLeft = Array(
+        new ColumnWithAlias("id", null),
+        new ColumnWithAlias("name", null)
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("email", null),
+        new ColumnWithAlias("phone", null)
+      )
+    )
+  }
+
+  test("Extreme duplication scenarios") {
+    assertAliases(
+      leftInput = Array("id", "id", "id"),
+      rightInput = Array("id", "id"),
+      expectedLeft = Array(
+        new ColumnWithAlias("id", null),
+        new ColumnWithAlias("id", "id_1"),
+        new ColumnWithAlias("id", "id_2")
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("id", "id_3"),
+        new ColumnWithAlias("id", "id_4")
+      )
+    )
+  }
+
+  test("Exact duplicate column names") {
+    assertAliases(
+      leftInput = Array("id", "name"),
+      rightInput = Array("id", "name"),
+      expectedLeft = Array(
+        new ColumnWithAlias("id", null),
+        new ColumnWithAlias("name", null)
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("id", "id_1"),
+        new ColumnWithAlias("name", "name_1")
+      )
+    )
+  }
+
+  test("Columns with numeric suffixes (id vs id_1)") {
+    assertAliases(
+      leftInput = Array("id", "id_1", "name"),
+      rightInput = Array("id", "name", "value"),
+      expectedLeft = Array(
+        new ColumnWithAlias("id", null),
+        new ColumnWithAlias("id_1", null),
+        new ColumnWithAlias("name", null)
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("id", "id_2"),
+        new ColumnWithAlias("name", "name_1"),
+        new ColumnWithAlias("value", null)
+      )
+    )
+  }
+
+  test("Case-sensitive conflicts (ID vs id)") {
+    assertAliases(
+      leftInput = Array("ID", "Name"),
+      rightInput = Array("id", "name"),
+      expectedLeft = Array(
+        new ColumnWithAlias("ID", null),
+        new ColumnWithAlias("Name", null)
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("id", "id_1"),
+        new ColumnWithAlias("name", "name_1")
+      )
+    )
+  }
+
+  test("Mixed case and numeric suffixes") {
+    assertAliases(
+      leftInput = Array("UserID", "user_id", "user_id_1"),
+      rightInput = Array("userId", "USER_ID", "user_id_2"),
+      expectedLeft = Array(
+        new ColumnWithAlias("UserID", null),
+        new ColumnWithAlias("user_id", null),
+        new ColumnWithAlias("user_id_1", null)
+      ),
+      expectedRight = Array(
+        new ColumnWithAlias("userId", "userId_1"),
+        new ColumnWithAlias("USER_ID", "USER_ID_3"),
+        new ColumnWithAlias("user_id_2", null)
+      )
+    )
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
index 446dd268e654..7f267c738634 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
@@ -21,9 +21,10 @@ import java.sql.{Connection, DriverManager}
 import java.util.Properties
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.connector.DataSourcePushdownTestUtils
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.JdbcDialect
@@ -40,6 +41,8 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
   val joinTableName1: String = "join_table_1"
   val joinTableName2: String = "join_table_2"
+  val joinTableName3: String = "join_table_3"
+  val joinTableName4: String = "join_table_4"
 
   val jdbcDialect: JdbcDialect
 
@@ -56,6 +59,8 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
   private def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}"
   private def casedJoinTableName1 = caseConvert(joinTableName1)
   private def casedJoinTableName2 = caseConvert(joinTableName2)
+  private def casedJoinTableName3 = caseConvert(joinTableName3)
+  private def casedJoinTableName4 = caseConvert(joinTableName4)
 
   def qualifyTableName(tableName: String): String = {
     val fullyQualifiedCasedNamespace = 
jdbcDialect.quoteIdentifier(caseConvert(namespace))
@@ -67,8 +72,9 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     jdbcDialect.quoteIdentifier(caseConvert(namespace))
 
   private lazy val fullyQualifiedTableName1: String = 
qualifyTableName(joinTableName1)
-
   private lazy val fullyQualifiedTableName2: String = 
qualifyTableName(joinTableName2)
+  private lazy val fullyQualifiedTableName3: String = 
qualifyTableName(joinTableName3)
+  private lazy val fullyQualifiedTableName4: String = 
qualifyTableName(joinTableName4)
 
   protected def getJDBCTypeString(dt: DataType): String = {
     JdbcUtils.getJdbcType(dt, jdbcDialect).databaseTypeDefinition.toUpperCase()
@@ -76,6 +82,10 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
   protected def caseConvert(identifier: String): String = identifier
 
+  // Quote the identifier to remain original case, for example, MySql convert 
[`ID`, ID]
+  // to [ID, id]
+  protected def remainColumnCase(identifier: String): String = "\"" + 
identifier + "\""
+
   protected def withConnection[T](f: Connection => T): T = {
     val conn = DriverManager.getConnection(url, new Properties())
     try {
@@ -132,6 +142,26 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
            |  SURNAME ${getJDBCTypeString(stringType)}
            |)""".stripMargin
       ).executeUpdate()
+
+      // Complex situations with different capitalization and quotation marks.
+      conn.prepareStatement(
+        s"""CREATE TABLE $fullyQualifiedTableName3(
+           |${remainColumnCase("id")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_1")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_2")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_1_1")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("sid")} ${getJDBCTypeString(integerType)}
+           |)""".stripMargin
+      ).executeUpdate()
+      conn.prepareStatement(
+        s"""CREATE TABLE $fullyQualifiedTableName4 (
+           |${remainColumnCase("id")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_1")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_2")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("id_2_1")} ${getJDBCTypeString(integerType)},
+           |${remainColumnCase("Sid")} ${getJDBCTypeString(integerType)}
+           |)""".stripMargin
+      ).executeUpdate()
     }
   }
 
@@ -181,6 +211,10 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       insertStmt2.executeBatch()
       insertStmt2.close()
 
+      conn.createStatement().execute(
+        s"""INSERT INTO $fullyQualifiedTableName3 VALUES (0, 1, 2, 3, 4)""")
+      conn.createStatement().execute(
+        s"""INSERT INTO $fullyQualifiedTableName4 VALUES (0, -1, -2, -3, 
-4)""")
     }
   }
 
@@ -304,14 +338,14 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
 
-      val expectedSchemaWithoutNames = StructType(
+      val expectedSchema = StructType(
         Seq(
-          StructField("", integerType), // ID
-          StructField("", integerType), // NEXT_ID
+          StructField(caseConvert("id"), integerType), // ID
+          StructField(caseConvert("id_1"), integerType), // ID
           StructField(caseConvert("amount"), decimalType) // AMOUNT
         )
       )
-      checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames)
+      checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
       checkJoinPushed(
         df,
         expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
@@ -336,13 +370,13 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
 
-      val expectedSchemaWithoutNames = StructType(
+      val expectedSchema = StructType(
         Seq(
           StructField(caseConvert("id"), integerType), // ID
           StructField(caseConvert("next_id"), integerType) // NEXT_ID
         )
       )
-      checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames)
+      checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
       checkJoinPushed(
         df,
         expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}",
@@ -371,18 +405,18 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
 
-      val expectedSchemaWithoutNames = StructType(
+      val expectedSchema = StructType(
         Seq(
-          StructField("", integerType), // ID_UUID
-          StructField("", decimalType), // AMOUNT_UUID
-          StructField("", integerType), // ID_UUID
-          StructField("", decimalType), // AMOUNT_UUID
-          StructField(caseConvert("address"), stringType), // ADDRESS
           StructField(caseConvert("id"), integerType), // ID
-          StructField(caseConvert("amount"), decimalType) // AMOUNT
+          StructField(caseConvert("amount"), decimalType), // AMOUNT
+          StructField(caseConvert("id_1"), integerType), // ID
+          StructField(caseConvert("amount_1"), decimalType), // AMOUNT
+          StructField(caseConvert("address"), stringType), // ADDRESS
+          StructField(caseConvert("id_2"), integerType), // ID
+          StructField(caseConvert("amount_2"), decimalType) // AMOUNT
         )
       )
-      checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames)
+      checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
       checkJoinPushed(
         df,
         expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
@@ -640,4 +674,34 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       checkAnswer(df, rows)
     }
   }
+
+  test("Test complex duplicate column name alias") {
+    val sqlQuery = s"""
+                      |SELECT
+                      |    *
+                      |FROM $catalogAndNamespace.$casedJoinTableName3 a
+                      |JOIN $catalogAndNamespace.$casedJoinTableName4 b
+                      |ON a.id = b.id""".stripMargin
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val row = df.collect()(0)
+      assert(row.toString == Row(0, 1, 2, 3, 4, 0, -1, -2, -3, -4).toString)
+
+      assert(df.schema.fields.map(_.name) sameElements
+        Array("id", "id_1", "id_2", "id_1_1", "sid",
+          "id", "id_1", "id_2", "id_2_1", "Sid"),
+        "Unexpected schema names: " + 
df.schema.fields.map(_.name).mkString(","))
+
+      val schemaNames = df.queryExecution.optimizedPlan.collectFirst {
+        case j: DataSourceV2ScanRelation => j
+      }.get.schema.fields.map(_.name)
+      assert(schemaNames sameElements
+        Array("id", "id_1", "id_2", "id_1_1", "sid",
+          "id_3", "id_1_2", "id_2_2", "id_2_1", "Sid_1"),
+        "Unexpected schema names: " + schemaNames.mkString(","))
+
+      checkJoinPushed(df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to