Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2a3e50e24 -> efa11a42f


[SPARK-20335][SQL][BACKPORT-2.1] Children expressions of Hive UDF impacts the 
determinism of Hive UDF

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/17635 to Spark 2.1

---
```JAVA
  /**
   * Certain optimizations should not be applied if UDF is not deterministic.
   * Deterministic UDF returns same result each time it is invoked with a
   * particular input. This determinism just needs to hold within the context of
   * a query.
   *
   * return true if the UDF is deterministic
   */
  boolean deterministic() default true;
```

Based on the definition of 
[UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50),
 when Hive UDF's children are non-deterministic, Hive UDF is also 
non-deterministic.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsm...@gmail.com>

Closes #17652 from gatorsmile/backport-17635.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efa11a42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efa11a42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efa11a42

Branch: refs/heads/branch-2.1
Commit: efa11a42f0c34dcfaf4a1bf17055539c43c8e4f9
Parents: 2a3e50e
Author: Xiao Li <gatorsm...@gmail.com>
Authored: Mon Apr 17 15:59:55 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Apr 17 15:59:55 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |  4 +--
 .../hive/execution/AggregationQuerySuite.scala  | 13 +++++++++
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 30 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 37414ad..3e46b74 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -42,7 +42,7 @@ private[hive] case class HiveSimpleUDF(
     name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
   extends Expression with HiveInspectors with CodegenFallback with Logging {
 
-  override def deterministic: Boolean = isUDFDeterministic
+  override def deterministic: Boolean = isUDFDeterministic && 
children.forall(_.deterministic)
 
   override def nullable: Boolean = true
 
@@ -120,7 +120,7 @@ private[hive] case class HiveGenericUDF(
 
   override def nullable: Boolean = true
 
-  override def deterministic: Boolean = isUDFDeterministic
+  override def deterministic: Boolean = isUDFDeterministic && 
children.forall(_.deterministic)
 
   override def foldable: Boolean =
     isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]

http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 4a8086d..84f9159 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest 
with SQLTestUtils with Te
         Row(null, null, 110.0, null, null, 10.0) :: Nil)
   }
 
+  test("non-deterministic children expressions of UDAF") {
+    val e = intercept[AnalysisException] {
+      spark.sql(
+        """
+          |SELECT mydoublesum(value + 1.5 * key + rand())
+          |FROM agg1
+          |GROUP BY key
+        """.stripMargin)
+    }.getMessage
+    assert(Seq("nondeterministic expression",
+      "should not appear in the arguments of an aggregate 
function").forall(e.contains))
+  }
+
   test("interpreted aggregate function") {
     checkAnswer(
       spark.sql(

http://git-wip-us.apache.org/repos/asf/spark/blob/efa11a42/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 4098bb5..78c80da 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.apache.hadoop.io.{LongWritable, Writable}
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.functions.max
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -338,6 +339,35 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
     hiveContext.reset()
   }
 
+  test("non-deterministic children of UDF") {
+    withUserDefinedFunction("testStringStringUDF" -> true, 
"testGenericUDFHash" -> true) {
+      // HiveSimpleUDF
+      sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS 
'${classOf[UDFStringString].getName}'")
+      val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")")
+      
assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+
+      // HiveGenericUDF
+      sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS 
'${classOf[GenericUDFHash].getName}'")
+      val df2 = sql("SELECT testGenericUDFHash(rand())")
+      
assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+    }
+  }
+
+  test("non-deterministic children expressions of UDAF") {
+    withTempView("view1") {
+      spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")
+      withUserDefinedFunction("testUDAFPercentile" -> true) {
+        // non-deterministic children of Hive UDAF
+        sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS 
'${classOf[UDAFPercentile].getName}'")
+        val e1 = intercept[AnalysisException] {
+          sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y")
+        }.getMessage
+        assert(Seq("nondeterministic expression",
+          "should not appear in the arguments of an aggregate 
function").forall(e1.contains))
+      }
+    }
+  }
+
   test("Hive UDFs with insufficient number of input arguments should trigger 
an analysis error") {
     Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to