This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 34d7a3c5ccf6 [SPARK-52975][SQL] Simplify field names in pushdown join sql 34d7a3c5ccf6 is described below commit 34d7a3c5ccf6a4d4ca714297fb6cf7d36ed2463f Author: dengziming <dengzim...@bytedance.com> AuthorDate: Mon Aug 4 13:28:57 2025 +0800 [SPARK-52975][SQL] Simplify field names in pushdown join sql ### What changes were proposed in this pull request? When pushing down join SQL, we generated aliases for duplicated names, but the aliases are too long to read and nondeterministic. Before this change: ``` SELECT "ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0","AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a","ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d","ID","AMOUNT","ADDRESS" FROM xxxx RelationV2[ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0#18, AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a#19, ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d#20, ID#21, AMOUNT#22, ADDRESS#23] join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 ``` After this change. ``` SELECT "ID","AMOUNT","ADDRESS","ID_1","AMOUNT_1","ADDRESS_1" FROM xxx RelationV2[ID#18, AMOUNT#19, ADDRESS#20, ID_1#21, AMOUNT_1#22, ADDRESS_1#23] join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 ``` ### Why are the changes needed? Make code-generated JDBC SQL clearer and deterministic. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests can ensure no side effects are introduced. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Trae. Closes #51686 from dengziming/SPARK-52975. Authored-by: dengziming <dengzim...@bytedance.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../join/MySQLJoinPushdownIntegrationSuite.scala | 2 + .../datasources/v2/V2ScanRelationPushDown.scala | 118 ++++++++++------ .../connector/DataSourcePushdownTestUtils.scala | 4 +- .../v2/DSV2JoinPushDownAliasGenerationSuite.scala | 152 +++++++++++++++++++++ .../JDBCV2JoinPushdownIntegrationSuiteBase.scala | 96 ++++++++++--- 5 files changed, 314 insertions(+), 58 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala index f6d70dc8ae16..1a1536b6a292 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala @@ -43,6 +43,8 @@ class MySQLJoinPushdownIntegrationSuite override def caseConvert(identifier: String): String = identifier.toUpperCase(Locale.ROOT) + override def remainColumnCase(identifier: String): String = "`" + identifier + "`" + // This method comes from DockerJDBCIntegrationSuite override def dataPreparation(connection: Connection): Unit = { super.dataPreparation() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 0e3141aacbdf..078acb72ce07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.Locale + import scala.collection.mutable import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT} @@ -137,42 +139,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // Cross joins are not supported because they increase the amount of data. condition.isDefined && lBuilder.isOtherSideCompatibleForJoin(rBuilder) => - val leftSideRequiredColumnNames = getRequiredColumnNames(leftProjections, leftHolder) - val rightSideRequiredColumnNames = getRequiredColumnNames(rightProjections, rightHolder) - - // Alias the duplicated columns from left side of the join. We are creating the - // Map[String, Int] to tell how many times each column name has occured within one side. - val leftSideNameCounts: Map[String, Int] = - leftSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap - val rightSideNameCounts: Map[String, Int] = - rightSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap - // It's more performant to call contains on Set than on Seq - val rightSideColumnNamesSet = rightSideRequiredColumnNames.toSet - - val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map { name => - val aliasName = - if (leftSideNameCounts(name) > 1 || rightSideColumnNamesSet.contains(name)) { - generateJoinOutputAlias(name) - } else { - null - } - - new SupportsPushDownJoin.ColumnWithAlias(name, aliasName) - } - - // Aliasing of duplicated columns in right side is done only if there are duplicates in - // right side only. There won't be a conflict with left side columns because they are - // already aliased. - val rightSideRequiredColumnsWithAliases = rightSideRequiredColumnNames.map { name => - val aliasName = - if (rightSideNameCounts(name) > 1) { - generateJoinOutputAlias(name) - } else { - null - } - - new SupportsPushDownJoin.ColumnWithAlias(name, aliasName) - } + // Process left and right columns in original order + val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) = + generateColumnAliasesForDuplicatedName( + getRequiredColumnNames(leftProjections, leftHolder), + getRequiredColumnNames(rightProjections, rightHolder)) // Create the AttributeMap that holds (Attribute -> Attribute with up to date name) mapping. val pushedJoinOutputMap = AttributeMap[Expression]( @@ -249,11 +220,80 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { node } } + /** + * Generates unique column aliases for join operations to avoid naming conflicts. + * Handles case sensitivity issues across different databases (SQL Server, MySQL, etc.). + * + * @param leftSideRequiredColumnNames Columns from the left side of the join + * @param rightSideRequiredColumnNames Columns from the right side of the join + * @return Tuple of (leftColumnsWithAliases, rightColumnsWithAliases) + */ + private[v2] def generateColumnAliasesForDuplicatedName( + leftSideRequiredColumnNames: Array[String], + rightSideRequiredColumnNames: Array[String] + ): (Array[SupportsPushDownJoin.ColumnWithAlias], + Array[SupportsPushDownJoin.ColumnWithAlias]) = { + // Normalize all column names to lowercase for case-insensitive comparison + val normalizeCase: String => String = _.toLowerCase(Locale.ROOT) + + // Count occurrences of each column name (case-insensitive) + val allRequiredColumnNames = leftSideRequiredColumnNames ++ rightSideRequiredColumnNames + val allNameCounts: Map[String, Int] = + allRequiredColumnNames.map(normalizeCase) + .groupBy(identity) + .view + .mapValues(_.length) + .toMap + + // Track claimed aliases using normalized names. + // Use Set for O(1) lookups when checking existing column names, claim all names + // that appears only once to ensure they have highest priority. + val allClaimedAliases = mutable.Set.from( + allNameCounts.filter(_._2 == 1).keys + ) + + // Track suffix index for each base column name (starts at 0) to avoid extreme worst + // case of O(n^2) alias generation. + val aliasSuffixIndex = mutable.HashMap[String, Int]().withDefaultValue(0) + + def processColumn(originalName: String): SupportsPushDownJoin.ColumnWithAlias = { + val normalizedName = normalizeCase(originalName) + + // No alias needed for unique column names + if (allNameCounts(normalizedName) == 1) { + new SupportsPushDownJoin.ColumnWithAlias(originalName, null) + } else { + var attempt = aliasSuffixIndex(normalizedName) + var candidate = if (attempt == 0) originalName else s"${originalName}_$attempt" + var normalizedCandidate = normalizeCase(candidate) + + // Find first available unique alias, use original name for the first attempt, then append + // suffix for more attempts. + while (allClaimedAliases.contains(normalizedCandidate)) { + attempt += 1 + candidate = s"${originalName}_$attempt" + normalizedCandidate = normalizeCase(candidate) + } + + // Update tracking state + aliasSuffixIndex(normalizedName) = attempt + 1 + allClaimedAliases.add(normalizedCandidate) - def generateJoinOutputAlias(name: String): String = - s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" + if (originalName == candidate) { + new SupportsPushDownJoin.ColumnWithAlias(originalName, null) + } else { + new SupportsPushDownJoin.ColumnWithAlias(originalName, candidate) + } + } + } + + ( + leftSideRequiredColumnNames.map(processColumn), + rightSideRequiredColumnNames.map(processColumn) + ) + } - // projections' names are maybe not up to date if the joins have been previously pushed down. + // Projections' names are maybe not up to date if the joins have been previously pushed down. // For this reason, we need to use pushedJoinOutputMap to get up to date names. def getRequiredColumnNames( projections: Seq[NamedExpression], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala index 7b8774980d2c..ee2bcd34bf3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala @@ -182,9 +182,7 @@ trait DataSourcePushdownTestUtils extends ExplainSuiteHelper { assert(dfSchema.length == schema.length) dfSchema.fields.zip(schema.fields).foreach { case (f1, f2) => - if (f2.name.nonEmpty) { - assert(f1.name == f2.name) - } + assert(f1.name == f2.name) assert(f1.dataType == f2.dataType) assert(f1.nullable == f2.nullable) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala new file mode 100644 index 000000000000..a30f65d981a3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DSV2JoinPushDownAliasGenerationSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.Locale + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.connector.read.SupportsPushDownJoin.ColumnWithAlias + +class DSV2JoinPushDownAliasGenerationSuite extends SparkFunSuite { + + private def assertAliases( + leftInput: Array[String], + rightInput: Array[String], + expectedLeft: Array[ColumnWithAlias], + expectedRight: Array[ColumnWithAlias] + ): Unit = { + val (actualLeft, actualRight) = V2ScanRelationPushDown + .generateColumnAliasesForDuplicatedName(leftInput, rightInput) + + val uniqName: ColumnWithAlias => String = col => { + if (col.alias() == null) col.colName() else col.alias().toLowerCase(Locale.ROOT) + } + // Ensure no duplicate column names after ignoring capitalization + assert((actualLeft ++ actualRight).map(uniqName).distinct.length + == actualLeft.length + actualRight.length) + + assert( + actualLeft === expectedLeft, + s"""Left side aliases mismatch. + |Expected: ${expectedLeft.map(_.alias()).mkString(", ")} + |Actual: ${actualLeft.map(_.alias()).mkString(", ")}""".stripMargin + ) + + assert( + actualRight === expectedRight, + s"""Right side aliases mismatch. + |Expected: ${expectedRight.map(_.alias()).mkString(", ")} + |Actual: ${actualRight.map(_.alias()).mkString(", ")}""".stripMargin + ) + } + + test("Basic case with no duplicate column names") { + assertAliases( + leftInput = Array("id", "name"), + rightInput = Array("email", "phone"), + expectedLeft = Array( + new ColumnWithAlias("id", null), + new ColumnWithAlias("name", null) + ), + expectedRight = Array( + new ColumnWithAlias("email", null), + new ColumnWithAlias("phone", null) + ) + ) + } + + test("Extreme duplication scenarios") { + assertAliases( + leftInput = Array("id", "id", "id"), + rightInput = Array("id", "id"), + expectedLeft = Array( + new ColumnWithAlias("id", null), + new ColumnWithAlias("id", "id_1"), + new ColumnWithAlias("id", "id_2") + ), + expectedRight = Array( + new ColumnWithAlias("id", "id_3"), + new ColumnWithAlias("id", "id_4") + ) + ) + } + + test("Exact duplicate column names") { + assertAliases( + leftInput = Array("id", "name"), + rightInput = Array("id", "name"), + expectedLeft = Array( + new ColumnWithAlias("id", null), + new ColumnWithAlias("name", null) + ), + expectedRight = Array( + new ColumnWithAlias("id", "id_1"), + new ColumnWithAlias("name", "name_1") + ) + ) + } + + test("Columns with numeric suffixes (id vs id_1)") { + assertAliases( + leftInput = Array("id", "id_1", "name"), + rightInput = Array("id", "name", "value"), + expectedLeft = Array( + new ColumnWithAlias("id", null), + new ColumnWithAlias("id_1", null), + new ColumnWithAlias("name", null) + ), + expectedRight = Array( + new ColumnWithAlias("id", "id_2"), + new ColumnWithAlias("name", "name_1"), + new ColumnWithAlias("value", null) + ) + ) + } + + test("Case-sensitive conflicts (ID vs id)") { + assertAliases( + leftInput = Array("ID", "Name"), + rightInput = Array("id", "name"), + expectedLeft = Array( + new ColumnWithAlias("ID", null), + new ColumnWithAlias("Name", null) + ), + expectedRight = Array( + new ColumnWithAlias("id", "id_1"), + new ColumnWithAlias("name", "name_1") + ) + ) + } + + test("Mixed case and numeric suffixes") { + assertAliases( + leftInput = Array("UserID", "user_id", "user_id_1"), + rightInput = Array("userId", "USER_ID", "user_id_2"), + expectedLeft = Array( + new ColumnWithAlias("UserID", null), + new ColumnWithAlias("user_id", null), + new ColumnWithAlias("user_id_1", null) + ), + expectedRight = Array( + new ColumnWithAlias("userId", "userId_1"), + new ColumnWithAlias("USER_ID", "USER_ID_3"), + new ColumnWithAlias("user_id_2", null) + ) + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala index 446dd268e654..7f267c738634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala @@ -21,9 +21,10 @@ import java.sql.{Connection, DriverManager} import java.util.Properties import org.apache.spark.SparkConf -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.connector.DataSourcePushdownTestUtils import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialect @@ -40,6 +41,8 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase val joinTableName1: String = "join_table_1" val joinTableName2: String = "join_table_2" + val joinTableName3: String = "join_table_3" + val joinTableName4: String = "join_table_4" val jdbcDialect: JdbcDialect @@ -56,6 +59,8 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase private def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}" private def casedJoinTableName1 = caseConvert(joinTableName1) private def casedJoinTableName2 = caseConvert(joinTableName2) + private def casedJoinTableName3 = caseConvert(joinTableName3) + private def casedJoinTableName4 = caseConvert(joinTableName4) def qualifyTableName(tableName: String): String = { val fullyQualifiedCasedNamespace = jdbcDialect.quoteIdentifier(caseConvert(namespace)) @@ -67,8 +72,9 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase jdbcDialect.quoteIdentifier(caseConvert(namespace)) private lazy val fullyQualifiedTableName1: String = qualifyTableName(joinTableName1) - private lazy val fullyQualifiedTableName2: String = qualifyTableName(joinTableName2) + private lazy val fullyQualifiedTableName3: String = qualifyTableName(joinTableName3) + private lazy val fullyQualifiedTableName4: String = qualifyTableName(joinTableName4) protected def getJDBCTypeString(dt: DataType): String = { JdbcUtils.getJdbcType(dt, jdbcDialect).databaseTypeDefinition.toUpperCase() @@ -76,6 +82,10 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase protected def caseConvert(identifier: String): String = identifier + // Quote the identifier to remain original case, for example, MySql convert [`ID`, ID] + // to [ID, id] + protected def remainColumnCase(identifier: String): String = "\"" + identifier + "\"" + protected def withConnection[T](f: Connection => T): T = { val conn = DriverManager.getConnection(url, new Properties()) try { @@ -132,6 +142,26 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase | SURNAME ${getJDBCTypeString(stringType)} |)""".stripMargin ).executeUpdate() + + // Complex situations with different capitalization and quotation marks. + conn.prepareStatement( + s"""CREATE TABLE $fullyQualifiedTableName3( + |${remainColumnCase("id")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_1")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_2")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_1_1")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("sid")} ${getJDBCTypeString(integerType)} + |)""".stripMargin + ).executeUpdate() + conn.prepareStatement( + s"""CREATE TABLE $fullyQualifiedTableName4 ( + |${remainColumnCase("id")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_1")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_2")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("id_2_1")} ${getJDBCTypeString(integerType)}, + |${remainColumnCase("Sid")} ${getJDBCTypeString(integerType)} + |)""".stripMargin + ).executeUpdate() } } @@ -181,6 +211,10 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase insertStmt2.executeBatch() insertStmt2.close() + conn.createStatement().execute( + s"""INSERT INTO $fullyQualifiedTableName3 VALUES (0, 1, 2, 3, 4)""") + conn.createStatement().execute( + s"""INSERT INTO $fullyQualifiedTableName4 VALUES (0, -1, -2, -3, -4)""") } } @@ -304,14 +338,14 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - val expectedSchemaWithoutNames = StructType( + val expectedSchema = StructType( Seq( - StructField("", integerType), // ID - StructField("", integerType), // NEXT_ID + StructField(caseConvert("id"), integerType), // ID + StructField(caseConvert("id_1"), integerType), // ID StructField(caseConvert("amount"), decimalType) // AMOUNT ) ) - checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames) + checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) checkJoinPushed( df, expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + @@ -336,13 +370,13 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - val expectedSchemaWithoutNames = StructType( + val expectedSchema = StructType( Seq( StructField(caseConvert("id"), integerType), // ID StructField(caseConvert("next_id"), integerType) // NEXT_ID ) ) - checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames) + checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) checkJoinPushed( df, expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}", @@ -371,18 +405,18 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - val expectedSchemaWithoutNames = StructType( + val expectedSchema = StructType( Seq( - StructField("", integerType), // ID_UUID - StructField("", decimalType), // AMOUNT_UUID - StructField("", integerType), // ID_UUID - StructField("", decimalType), // AMOUNT_UUID - StructField(caseConvert("address"), stringType), // ADDRESS StructField(caseConvert("id"), integerType), // ID - StructField(caseConvert("amount"), decimalType) // AMOUNT + StructField(caseConvert("amount"), decimalType), // AMOUNT + StructField(caseConvert("id_1"), integerType), // ID + StructField(caseConvert("amount_1"), decimalType), // AMOUNT + StructField(caseConvert("address"), stringType), // ADDRESS + StructField(caseConvert("id_2"), integerType), // ID + StructField(caseConvert("amount_2"), decimalType) // AMOUNT ) ) - checkPrunedColumnsDataTypeAndNullability(df, expectedSchemaWithoutNames) + checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) checkJoinPushed( df, expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + @@ -640,4 +674,34 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase checkAnswer(df, rows) } } + + test("Test complex duplicate column name alias") { + val sqlQuery = s""" + |SELECT + | * + |FROM $catalogAndNamespace.$casedJoinTableName3 a + |JOIN $catalogAndNamespace.$casedJoinTableName4 b + |ON a.id = b.id""".stripMargin + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val row = df.collect()(0) + assert(row.toString == Row(0, 1, 2, 3, 4, 0, -1, -2, -3, -4).toString) + + assert(df.schema.fields.map(_.name) sameElements + Array("id", "id_1", "id_2", "id_1_1", "sid", + "id", "id_1", "id_2", "id_2_1", "Sid"), + "Unexpected schema names: " + df.schema.fields.map(_.name).mkString(",")) + + val schemaNames = df.queryExecution.optimizedPlan.collectFirst { + case j: DataSourceV2ScanRelation => j + }.get.schema.fields.map(_.name) + assert(schemaNames sameElements + Array("id", "id_1", "id_2", "id_1_1", "sid", + "id_3", "id_1_2", "id_2_2", "id_2_1", "Sid_1"), + "Unexpected schema names: " + schemaNames.mkString(",")) + + checkJoinPushed(df) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org