[ 
https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722697#comment-16722697
 ] 

ASF GitHub Bot commented on SPARK-26352:
----------------------------------------

asfgit closed pull request #23303: [SPARK-26352][SQL] join reorder should not 
change the order of output attributes
URL: https://github.com/apache/spark/pull/23303
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 064ca68b7a628..01634a9d852c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -48,6 +48,7 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with 
PredicateHelper {
           if projectList.forall(_.isInstanceOf[Attribute]) =>
           reorder(p, p.output)
       }
+
       // After reordering is finished, convert OrderedJoin back to Join
       result transformDown {
         case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond)
@@ -175,11 +176,20 @@ object JoinReorderDP extends PredicateHelper with Logging 
{
         assert(topOutputSet == p.outputSet)
         // Keep the same order of final output attributes.
         p.copy(projectList = output)
+      case finalPlan if !sameOutput(finalPlan, output) =>
+        Project(output, finalPlan)
       case finalPlan =>
         finalPlan
     }
   }
 
+  private def sameOutput(plan: LogicalPlan, expectedOutput: Seq[Attribute]): 
Boolean = {
+    val thisOutput = plan.output
+    thisOutput.length == expectedOutput.length && 
thisOutput.zip(expectedOutput).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
+
   /** Find all possible plans at the next level, based on existing levels. */
   private def searchLevel(
       existingLevels: Seq[JoinPlanMap],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 6ebb194d71c2e..0b6471289a471 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -86,9 +86,9 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case ExtractFiltersAndInnerJoins(input, conditions)
+    case p @ ExtractFiltersAndInnerJoins(input, conditions)
         if input.size > 2 && conditions.nonEmpty =>
-      if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
+      val reordered = if (SQLConf.get.starSchemaDetection && 
!SQLConf.get.cboEnabled) {
         val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
         if (starJoinPlan.nonEmpty) {
           val rest = input.filterNot(starJoinPlan.contains(_))
@@ -99,6 +99,14 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
       } else {
         createOrderedJoin(input, conditions)
       }
+
+      if (p.sameOutput(reordered)) {
+        reordered
+      } else {
+        // Reordering the joins have changed the order of the columns.
+        // Inject a projection to make sure we restore to the expected 
ordering.
+        Project(p.output, reordered)
+      }
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index ccd9d8dd4d213..e9438b2eee550 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -102,16 +102,19 @@ class JoinOptimizationSuite extends PlanTest {
         x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === 
"z.a".attr)),
         x.join(z, condition = Some("x.b".attr === "z.b".attr))
           .join(y, condition = Some("y.d".attr === "z.a".attr))
+          .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", 
"z.c").map(_.attr): _*)
       ),
       (
         x.join(y, Cross).join(z, Cross)
           .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
         x.join(z, Cross, Some("x.b".attr === "z.b".attr))
           .join(y, Cross, Some("y.d".attr === "z.a".attr))
+          .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", 
"z.c").map(_.attr): _*)
       ),
       (
         x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
         x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
+          .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", 
"z.c").map(_.attr): _*)
       )
     )
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 565b0a10154a8..c94a8b9e318f6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
 import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, 
JOIN_REORDER_ENABLED}
@@ -124,7 +124,8 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
     // the original order (t1 J t2) J t3.
     val bestPlan =
       t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === 
nameToAttr("t3.v-1-100")))
-      .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
+        .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === 
nameToAttr("t2.k-1-5")))
+        .select(outputsOf(t1, t2, t3): _*)
 
     assertEqualPlans(originalPlan, bestPlan)
   }
@@ -139,7 +140,9 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
     val bestPlan =
       t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === 
nameToAttr("t3.v-1-100")))
         .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === 
nameToAttr("t2.k-1-5")))
+        .select(outputsOf(t1, t2, t3): _*) // this is redundant but we'll take 
it for now
         .join(t4)
+        .select(outputsOf(t1, t2, t4, t3): _*)
 
     assertEqualPlans(originalPlan, bestPlan)
   }
@@ -202,6 +205,7 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
       t1.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === 
nameToAttr("t2.k-1-5")))
         .join(t4.join(t3, Inner, Some(nameToAttr("t4.v-1-10") === 
nameToAttr("t3.v-1-100"))),
           Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
+        .select(outputsOf(t1, t4, t2, t3): _*)
 
     assertEqualPlans(originalPlan, bestPlan)
   }
@@ -219,6 +223,23 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
     }
   }
 
+  test("SPARK-26352: join reordering should not change the order of 
attributes") {
+    // This test case does not rely on CBO.
+    // It's similar to the test case above, but catches a reordering bug that 
the one above doesn't
+    val tab1 = LocalRelation('x.int, 'y.int)
+    val tab2 = LocalRelation('i.int, 'j.int)
+    val tab3 = LocalRelation('a.int, 'b.int)
+    val original =
+      tab1.join(tab2, Cross)
+          .join(tab3, Inner, Some('a === 'x && 'b === 'i))
+    val expected =
+      tab1.join(tab3, Inner, Some('a === 'x))
+          .join(tab2, Cross, Some('b === 'i))
+          .select(outputsOf(tab1, tab2, tab3): _*)
+
+    assertEqualPlans(original, expected)
+  }
+
   test("reorder recursively") {
     // Original order:
     //          Join
@@ -266,8 +287,17 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
   private def assertEqualPlans(
       originalPlan: LogicalPlan,
       groundTruthBestPlan: LogicalPlan): Unit = {
-    val optimized = Optimize.execute(originalPlan.analyze)
+    val analyzed = originalPlan.analyze
+    val optimized = Optimize.execute(analyzed)
     val expected = groundTruthBestPlan.analyze
+
+    assert(analyzed.sameOutput(expected)) // if this fails, the expected plan 
itself is incorrect
+    assert(analyzed.sameOutput(optimized))
+
     compareJoinOrder(optimized, expected)
   }
+
+  private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+    plans.map(_.output).reduce(_ ++ _)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index d4d23ad69b2c2..baae934e1e4fe 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -218,6 +218,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
         .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
         .join(t2, Inner, Some(nameToAttr("f1_c2") === nameToAttr("t2_c1")))
         .join(t1, Inner, Some(nameToAttr("f1_c1") === nameToAttr("t1_c1")))
+        .select(outputsOf(f1, t1, t2, d1, d2): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -256,6 +257,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
         .join(t3.join(t2, Inner, Some(nameToAttr("t2_c2") === 
nameToAttr("t3_c1"))), Inner,
           Some(nameToAttr("d1_c2") === nameToAttr("t2_c1")))
         .join(t1, Inner, Some(nameToAttr("t1_c1") === nameToAttr("f1_c1")))
+        .select(outputsOf(d1, t1, t2, f1, d2, t3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -297,6 +299,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
           Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
         .join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === 
nameToAttr("t2_c1"))), Inner,
           Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
+        .select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -347,6 +350,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
           Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
         .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === 
nameToAttr("t6_c2"))), Inner,
           Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
+        .select(outputsOf(d1, t3, t4, f1, d2, t5, t6, d3, t1, t2): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -375,6 +379,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
       f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
         .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .select(outputsOf(d1, d2, f1, d3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -400,13 +405,27 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
       f1.join(t3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
         .join(t2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("t2_c1")))
         .join(t1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("t1_c1")))
+        .select(outputsOf(t1, f1, t2, t3): _*)
 
     assertEqualPlans(query, expected)
   }
 
   private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit 
= {
-    val optimized = Optimize.execute(plan1.analyze)
+    val analyzed = plan1.analyze
+    val optimized = Optimize.execute(analyzed)
     val expected = plan2.analyze
+
+    assert(equivalentOutput(analyzed, expected)) // if this fails, the 
expected itself is incorrect
+    assert(equivalentOutput(analyzed, optimized))
+
     compareJoinOrder(optimized, expected)
   }
+
+  private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+    plans.map(_.output).reduce(_ ++ _)
+  }
+
+  private def equivalentOutput(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+    normalizeExprIds(plan1).output == normalizeExprIds(plan2).output
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 4e0883e91e84a..9dc653b9d6c44 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -182,6 +182,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
         .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d2, f1, d3, s3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -220,6 +221,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") < nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, f1, d2, s3, d3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -255,7 +257,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("s3_c2")))
-
+        .select(outputsOf(d1, f1, d2, s3, d3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -292,6 +294,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_c2")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") < nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, f1, d2, s3, d3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -395,6 +398,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d2.where(nameToAttr("d2_c2") === 2), Inner,
           Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("f11_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, f11, f1, d2, s3): _*)
 
     assertEqualPlans(query, equivQuery)
   }
@@ -430,6 +434,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d2.where(nameToAttr("d2_c2") === 2), Inner,
           Some(nameToAttr("f1_fk2") === nameToAttr("d2_c4")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d3, f1, d2, s3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -465,6 +470,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d2.where(nameToAttr("d2_c2") === 2), Inner,
           Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d3, f1, d2, s3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -499,6 +505,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d2.where(nameToAttr("d2_c2") === 2),
           Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d3, f1, d2, s3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -532,6 +539,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d3, Inner, Some(nameToAttr("f1_fk3") < nameToAttr("d3_pk1")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") < nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") < nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d3, f1, d2, s3): _*)
 
     assertEqualPlans(query, expected)
   }
@@ -565,13 +573,27 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
         .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
         .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .select(outputsOf(d1, d3, f1, d2, s3): _*)
 
     assertEqualPlans(query, expected)
   }
 
-  private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit 
= {
-    val optimized = Optimize.execute(plan1.analyze)
+  private def assertEqualPlans(plan1: LogicalPlan, plan2: LogicalPlan): Unit = 
{
+    val analyzed = plan1.analyze
+    val optimized = Optimize.execute(analyzed)
     val expected = plan2.analyze
+
+    assert(equivalentOutput(analyzed, expected)) // if this fails, the 
expected itself is incorrect
+    assert(equivalentOutput(analyzed, optimized))
+
     compareJoinOrder(optimized, expected)
   }
+
+  private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+    plans.map(_.output).reduce(_ ++ _)
+  }
+
+  private def equivalentOutput(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+    normalizeExprIds(plan1).output == normalizeExprIds(plan2).output
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index aa2162c9d2cda..91445c8d96d85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -895,4 +895,18 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       checkAnswer(res, Row(0, 0, 0))
     }
   }
+
+  test("SPARK-26352: join reordering should not change the order of columns") {
+    withTable("tab1", "tab2", "tab3") {
+      spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1")
+      spark.sql("select 42 as i, 200 as j").write.saveAsTable("tab2")
+      spark.sql("select 1 as a, 42 as b").write.saveAsTable("tab3")
+
+      val df = spark.sql("""
+        with tmp as (select * from tab1 cross join tab2)
+        select * from tmp join tab3 on a = x and b = i
+      """)
+      checkAnswer(df, Row(1, 100, 42, 200, 1, 42))
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> join reordering should not change the order of output attributes
> ----------------------------------------------------------------
>
>                 Key: SPARK-26352
>                 URL: https://issues.apache.org/jira/browse/SPARK-26352
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Kris Mok
>            Priority: Major
>
> The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} 
> performs join reordering on inner joins. This was introduced from SPARK-12032 
> in 2015-12.
> After it had reordered the joins, though, it didn't check whether or not the 
> column order (in terms of the {{output}} attribute list) is still the same as 
> before. Thus, it's possible to have a mismatch between the reordered column 
> order vs the schema that a DataFrame thinks it has.
> This can be demonstrated with the example:
> {code:none}
> spark.sql("create table table_a (x int, y int) using parquet")
> spark.sql("create table table_b (i int, j int) using parquet")
> spark.sql("create table table_c (a int, b int) using parquet")
> val df = spark.sql("with df1 as (select * from table_a cross join table_b) 
> select * from df1 join table_c on a = x and b = i")
> {code}
> here's what the DataFrame thinks:
> {code:none}
> scala> df.printSchema
> root
>  |-- x: integer (nullable = true)
>  |-- y: integer (nullable = true)
>  |-- i: integer (nullable = true)
>  |-- j: integer (nullable = true)
>  |-- a: integer (nullable = true)
>  |-- b: integer (nullable = true)
> {code}
> here's what the optimized plan thinks, after join reordering:
> {code:none}
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- 
> ${a.name}: ${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- a: integer
> |-- b: integer
> |-- i: integer
> |-- j: integer
> {code}
> If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule 
> exclusion feature), it's back to normal:
> {code:none}
> scala> spark.conf.set("spark.sql.optimizer.excludedRules", 
> "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")
> scala> val df = spark.sql("with df1 as (select * from table_a cross join 
> table_b) select * from df1 join table_c on a = x and b = i")
> df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- 
> ${a.name}: ${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- i: integer
> |-- j: integer
> |-- a: integer
> |-- b: integer
> {code}
> Note that this column ordering problem leads to data corruption, and can 
> manifest itself in various symptoms:
> * Silently corrupting data, if the reordered columns happen to either have 
> matching types or have sufficiently-compatible types (e.g. all fixed length 
> primitive types are considered as "sufficiently compatible" in an UnsafeRow), 
> then only the resulting data is going to be wrong but it might not trigger 
> any alarms immediately. Or
> * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, 
> or even SIGSEGVs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to