This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 25c7d0f [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13 25c7d0f is described below commit 25c7d0fe6ae20a4c1c42e0cd0b448c08ab03f3fb Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sat Aug 22 09:24:16 2020 -0500 [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13 ### What changes were proposed in this pull request? The purpose of this pr is to resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), all remaining failed cases are fixed. The main change of this pr as follow: - Change of `ExecutorAllocationManager.scala` for core module compilation in Scala 2.13, it's a blocking problem - Change `Seq[_]` to `scala.collection.Seq[_]` refer to failed cases - Added different expected plan of `Test 4: Star with several branches` of StarJoinCostBasedReorderSuite for Scala 2.13 because the candidates plans: ``` Join Inner, (d1_pk#5 = f1_fk1#0) :- Join Inner, (f1_fk2#1 = d2_pk#8) : :- Join Inner, (f1_fk3#2 = d3_pk#11) ``` and ``` Join Inner, (f1_fk2#1 = d2_pk#8) :- Join Inner, (d1_pk#5 = f1_fk1#0) : :- Join Inner, (f1_fk3#2 = d3_pk#11) ``` have same cost `Cost(200,9200)`, but `HashMap` is rewritten in scala 2.13 and The order of iterations leads to different results. This pr fix test cases as follow: - LiteralExpressionSuite (1 FAILED -> PASS) - StarJoinCostBasedReorderSuite ( 1 FAILED-> PASS) - ObjectExpressionsSuite( 2 FAILED-> PASS) - ScalaReflectionSuite (1 FAILED-> PASS) - RowEncoderSuite (10 FAILED-> PASS) - ExpressionEncoderSuite (ABORTED-> PASS) ### Why are the changes needed? We need to support a Scala 2.13 build. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- - Scala 2.12: Pass the Jenkins or GitHub Action - Scala 2.13: Do the following: ``` dev/change-scala-version.sh 2.13 mvn clean install -DskipTests -pl sql/catalyst -Pscala-2.13 -am mvn test -pl sql/catalyst -Pscala-2.13 ``` **Before** ``` Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0 *** 1 SUITE ABORTED *** *** 15 TESTS FAILED *** ``` **After** ``` Tests: succeeded 4338, failed 0, canceled 0, ignored 6, pending 0 All tests passed. ``` Closes #29434 from LuciferYang/sql-catalyst-tests. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala | 2 +- .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 +++---- .../scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 4 +++- .../apache/spark/sql/catalyst/expressions/objects/objects.scala | 8 ++++---- .../org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala | 3 ++- .../sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala | 6 ++++-- 8 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 2d8beef..d27ee78 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -595,7 +595,7 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { if (decommissionEnabled) { - executorMonitor.executorsDecommissioned(executorsRemoved) + executorMonitor.executorsDecommissioned(executorsRemoved.toSeq) } else { executorMonitor.executorsKilled(executorsRemoved.toSeq) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 5b17f1d..475164a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -314,7 +314,7 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getSeq[T](i: Int): Seq[T] = getAs[Seq[T]](i) + def getSeq[T](i: Int): Seq[T] = getAs[scala.collection.Seq[T]](i).toSeq /** * Returns the value at position i of array type as `java.util.List`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 34d2f45..aab944c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -164,7 +164,7 @@ object CatalystTypeConverters { scalaValue match { case a: Array[_] => new GenericArrayData(a.map(elementConverter.toCatalyst)) - case s: Seq[_] => + case s: scala.collection.Seq[_] => new GenericArrayData(s.map(elementConverter.toCatalyst).toArray) case i: JavaIterable[_] => val iter = i.iterator diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 05de21b..a9c8b0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -284,7 +284,7 @@ object ScalaReflection extends ScalaReflection { // We serialize a `Set` to Catalyst array. When we deserialize a Catalyst array // to a `Set`, if there are duplicated elements, the elements will be de-duplicated. - case t if isSubtype(t, localTypeOf[Seq[_]]) || + case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) || isSubtype(t, localTypeOf[scala.collection.Set[_]]) => val TypeRef(_, _, Seq(elementType)) = t val Schema(dataType, elementNullable) = schemaFor(elementType) @@ -448,10 +448,9 @@ object ScalaReflection extends ScalaReflection { // Since List[_] also belongs to localTypeOf[Product], we put this case before // "case t if definedByConstructorParams(t)" to make sure it will match to the // case "localTypeOf[Seq[_]]" - case t if isSubtype(t, localTypeOf[Seq[_]]) => + case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) => val TypeRef(_, _, Seq(elementType)) = t toCatalystArray(inputObject, elementType) - case t if isSubtype(t, localTypeOf[Array[_]]) => val TypeRef(_, _, Seq(elementType)) = t toCatalystArray(inputObject, elementType) @@ -686,7 +685,7 @@ object ScalaReflection extends ScalaReflection { val TypeRef(_, _, Seq(elementType)) = t val Schema(dataType, nullable) = schemaFor(elementType) Schema(ArrayType(dataType, containsNull = nullable), nullable = true) - case t if isSubtype(t, localTypeOf[Seq[_]]) => + case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) => val TypeRef(_, _, Seq(elementType)) = t val Schema(dataType, nullable) = schemaFor(elementType) Schema(ArrayType(dataType, containsNull = nullable), nullable = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 765018f..ee63209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -291,9 +291,11 @@ object RowEncoder { MapObjects(deserializerFor(_), input, et), "array", ObjectType(classOf[Array[_]]), returnNullable = false) + // TODO should use `scala.collection.immutable.ArrayDeq.unsafeMake` method to create + // `immutable.Seq` in Scala 2.13 when Scala version compatibility is no longer required. StaticInvoke( scala.collection.mutable.WrappedArray.getClass, - ObjectType(classOf[Seq[_]]), + ObjectType(classOf[scala.collection.Seq[_]]), "make", arrayData :: Nil, returnNullable = false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 7cf2c73..4f6a587 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -736,8 +736,8 @@ case class MapObjects private( } private lazy val convertToSeq: Any => Seq[_] = inputDataType match { - case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => - _.asInstanceOf[Seq[_]] + case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) => + _.asInstanceOf[scala.collection.Seq[_]].toSeq case ObjectType(cls) if cls.isArray => _.asInstanceOf[Array[_]].toSeq case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => @@ -758,7 +758,7 @@ case class MapObjects private( case Some(cls) if classOf[WrappedArray[_]].isAssignableFrom(cls) => // Scala WrappedArray inputCollection => WrappedArray.make(executeFuncOnCollection(inputCollection).toArray) - case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + case Some(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) => // Scala sequence executeFuncOnCollection(_).toSeq case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => @@ -859,7 +859,7 @@ case class MapObjects private( // need to take care of Seq and List because they may have O(n) complexity for indexed accessing // like `list.get(1)`. Here we use Iterator to traverse Seq and List. val (getLength, prepareLoop, getLoopVar) = inputDataType match { - case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) => val it = ctx.freshName("it") ( s"${genInputData.value}.size()", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala index 5df2af9..3768f7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala @@ -136,7 +136,8 @@ object ArrayBasedMapData { keys.zip(values).toMap } - def toScalaMap(keys: Seq[Any], values: Seq[Any]): Map[Any, Any] = { + def toScalaMap(keys: scala.collection.Seq[Any], + values: scala.collection.Seq[Any]): Map[Any, Any] = { keys.zip(values).toMap } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala index 5b8e59a..a7c0bac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala @@ -328,8 +328,10 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas // level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 } // // Number of generated plans: 46 (vs. 82) + // TODO(SPARK-32687): find a way to make optimization result of `CostBasedJoinReorder` + // deterministic even if the input order is different. val query = - d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2) + d1.join(t3).join(t4).join(f1).join(d3).join(d2).join(t5).join(t6).join(t1).join(t2) .where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) && (nameToAttr("t3_c2") === nameToAttr("t4_c2")) && (nameToAttr("d1_pk") === nameToAttr("f1_fk1")) && @@ -350,7 +352,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas Some(nameToAttr("d3_c2") === nameToAttr("t1_c1"))) .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner, Some(nameToAttr("d2_c2") === nameToAttr("t5_c1"))) - .select(outputsOf(d1, t3, t4, f1, d2, t5, t6, d3, t1, t2): _*) + .select(outputsOf(d1, t3, t4, f1, d3, d2, t5, t6, t1, t2): _*) assertEqualPlans(query, expected) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org