Repository: spark Updated Branches: refs/heads/branch-2.1 2a3e50e24 -> efa11a42f
[SPARK-20335][SQL][BACKPORT-2.1] Children expressions of Hive UDF impacts the determinism of Hive UDF ### What changes were proposed in this pull request? This PR is to backport https://github.com/apache/spark/pull/17635 to Spark 2.1 --- ```JAVA /** * Certain optimizations should not be applied if UDF is not deterministic. * Deterministic UDF returns same result each time it is invoked with a * particular input. This determinism just needs to hold within the context of * a query. * * return true if the UDF is deterministic */ boolean deterministic() default true; ``` Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic. ### How was this patch tested? Added test cases. Author: Xiao Li <gatorsm...@gmail.com> Closes #17652 from gatorsmile/backport-17635. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efa11a42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efa11a42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efa11a42 Branch: refs/heads/branch-2.1 Commit: efa11a42f0c34dcfaf4a1bf17055539c43c8e4f9 Parents: 2a3e50e Author: Xiao Li <gatorsm...@gmail.com> Authored: Mon Apr 17 15:59:55 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Apr 17 15:59:55 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/hive/hiveUDFs.scala | 4 +-- .../hive/execution/AggregationQuerySuite.scala | 13 +++++++++ .../spark/sql/hive/execution/HiveUDFSuite.scala | 30 ++++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 37414ad..3e46b74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -42,7 +42,7 @@ private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression with HiveInspectors with CodegenFallback with Logging { - override def deterministic: Boolean = isUDFDeterministic + override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def nullable: Boolean = true @@ -120,7 +120,7 @@ private[hive] case class HiveGenericUDF( override def nullable: Boolean = true - override def deterministic: Boolean = isUDFDeterministic + override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def foldable: Boolean = isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 4a8086d..84f9159 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, null, 110.0, null, null, 10.0) :: Nil) } + test("non-deterministic children expressions of UDAF") { + val e = intercept[AnalysisException] { + spark.sql( + """ + |SELECT mydoublesum(value + 1.5 * key + rand()) + |FROM agg1 + |GROUP BY key + """.stripMargin) + }.getMessage + assert(Seq("nondeterministic expression", + "should not appear in the arguments of an aggregate function").forall(e.contains)) + } + test("interpreted aggregate function") { checkAnswer( spark.sql( http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 4098bb5..78c80da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.io.{LongWritable, Writable} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -338,6 +339,35 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { hiveContext.reset() } + test("non-deterministic children of UDF") { + withUserDefinedFunction("testStringStringUDF" -> true, "testGenericUDFHash" -> true) { + // HiveSimpleUDF + sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'") + val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")") + assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic)) + + // HiveGenericUDF + sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS '${classOf[GenericUDFHash].getName}'") + val df2 = sql("SELECT testGenericUDFHash(rand())") + assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic)) + } + } + + test("non-deterministic children expressions of UDAF") { + withTempView("view1") { + spark.range(1).selectExpr("id as x", "id as y").createTempView("view1") + withUserDefinedFunction("testUDAFPercentile" -> true) { + // non-deterministic children of Hive UDAF + sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'") + val e1 = intercept[AnalysisException] { + sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y") + }.getMessage + assert(Seq("nondeterministic expression", + "should not appear in the arguments of an aggregate function").forall(e1.contains)) + } + } + } + test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") { Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org