This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b67d369  [SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary 
columns to Long
b67d369 is described below

commit b67d36957287c1fbefa1996e6a4a009a75c4c3f8
Author: Huon Wilson <[email protected]>
AuthorDate: Wed Mar 20 16:34:34 2019 +0800

    [SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary columns to Long
    
    ## What changes were proposed in this pull request?
    
    This introduces a new SQL function 'xxhash64' for getting a 64-bit hash of 
an arbitrary number of columns.
    
    This is designed to exactly mimic the 32-bit `hash`, which uses
    MurmurHash3. The name is designed to be more future-proof than the
    'hash', by indicating the exact algorithm used, similar to md5 and the
    sha hashes.
    
    ## How was this patch tested?
    
    The tests for the existing `hash` function were duplicated to run with 
`xxhash64`.
    
    Closes #24019 from huonw/hash64.
    
    Authored-by: Huon Wilson <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 R/pkg/NAMESPACE                                       |  1 +
 R/pkg/R/functions.R                                   | 19 +++++++++++++++++++
 R/pkg/R/generics.R                                    |  4 ++++
 R/pkg/tests/fulltests/test_sparkSQL.R                 |  2 +-
 python/pyspark/sql/functions.py                       | 13 +++++++++++++
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala |  5 +++++
 .../sql/catalyst/analysis/FunctionRegistry.scala      |  1 +
 .../apache/spark/sql/catalyst/expressions/hash.scala  |  9 ++++++++-
 .../analysis/ExpressionTypeCheckingSuite.scala        |  1 +
 .../catalyst/expressions/HashExpressionsSuite.scala   |  5 +++++
 .../main/scala/org/apache/spark/sql/functions.scala   | 13 +++++++++++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala    |  4 +++-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala   | 19 +++++++++++++++++++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala    | 11 +++++++++++
 14 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 1dcad16..f9d9494 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -408,6 +408,7 @@ exportMethods("%<=>%",
               "weekofyear",
               "when",
               "window",
+              "xxhash64",
               "year")
 
 exportClasses("GroupedData")
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 8f425b1..d91896a 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -736,6 +736,25 @@ setMethod("hash",
           })
 
 #' @details
+#' \code{xxhash64}: Calculates the hash code of given columns using the 64-bit
+#' variant of the xxHash algorithm, and returns the result as a long
+#' column.
+#'
+#' @rdname column_misc_functions
+#' @aliases xxhash64 xxhash64,Column-method
+#' @note xxhash64 since 3.0.0
+setMethod("xxhash64",
+          signature(x = "Column"),
+          function(x, ...) {
+            jcols <- lapply(list(x, ...), function(x) {
+              stopifnot(class(x) == "Column")
+              x@jc
+            })
+            jc <- callJStatic("org.apache.spark.sql.functions", "xxhash64", 
jcols)
+            column(jc)
+          })
+
+#' @details
 #' \code{dayofmonth}: Extracts the day of the month as an integer from a
 #' given date/timestamp/string.
 #'
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index fcb511e..f849dd1 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1394,6 +1394,10 @@ setGeneric("weekofyear", function(x) { 
standardGeneric("weekofyear") })
 #' @name NULL
 setGeneric("window", function(x, ...) { standardGeneric("window") })
 
+#' @rdname column_misc_functions
+#' @name NULL
+setGeneric("xxhash64", function(x, ...) { standardGeneric("xxhash64") })
+
 #' @rdname column_datetime_functions
 #' @name NULL
 setGeneric("year", function(x) { standardGeneric("year") })
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index c9d6134..cebd0f8 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1390,7 +1390,7 @@ test_that("column functions", {
   c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + 
sqrt(c) + sum(c)
   c10 <- sumDistinct(c) + tan(c) + tanh(c) + degrees(c) + radians(c)
   c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
-  c12 <- variance(c) + ltrim(c, "a") + rtrim(c, "b") + trim(c, "c")
+  c12 <- variance(c) + xxhash64(c) + ltrim(c, "a") + rtrim(c, "b") + trim(c, 
"c")
   c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
   c14 <- cume_dist() + ntile(1) + corr(c, c1)
   c15 <- dense_rank() + percent_rank() + rank() + row_number()
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0326613..bc28c9d 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1468,6 +1468,19 @@ def hash(*cols):
     return Column(jc)
 
 
+@since(3.0)
+def xxhash64(*cols):
+    """Calculates the hash code of given columns using the 64-bit variant of 
the xxHash algorithm,
+    and returns the result as a long column.
+
+    >>> spark.createDataFrame([('ABC',)], 
['a']).select(xxhash64('a').alias('hash')).collect()
+    [Row(hash=4105715581806190027)]
+    """
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.xxhash64(_to_seq(sc, cols, _to_java_column))
+    return Column(jc)
+
+
 # ---------------------- String/Binary functions ------------------------------
 
 _string_functions = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ab9cedc..e4cf43d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1047,6 +1047,11 @@ class Analyzer(
             case s: Star => s.expand(child, resolver)
             case o => o :: Nil
           })
+        case p: XxHash64 if containsStar(p.children) =>
+          p.copy(children = p.children.flatMap {
+            case s: Star => s.expand(child, resolver)
+            case o => o :: Nil
+          })
         // count(*) has been replaced by count(1)
         case o if containsStar(o.children) =>
           failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index befc02f..46cf0f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -461,6 +461,7 @@ object FunctionRegistry {
     expression[Md5]("md5"),
     expression[Uuid]("uuid"),
     expression[Murmur3Hash]("hash"),
+    expression[XxHash64]("xxhash64"),
     expression[Sha1]("sha"),
     expression[Sha1]("sha1"),
     expression[Sha2]("sha2"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 8d17b07..21c28c5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -594,12 +594,19 @@ object Murmur3HashFunction extends 
InterpretedHashFunction {
 /**
  * A xxHash64 64-bit hash expression.
  */
+@ExpressionDescription(
+  usage = "_FUNC_(expr1, expr2, ...) - Returns a 64-bit hash value of the 
arguments.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_('Spark', array(123), 2);
+       5602566077635097486
+  """)
 case class XxHash64(children: Seq[Expression], seed: Long) extends 
HashExpression[Long] {
   def this(arguments: Seq[Expression]) = this(arguments, 42L)
 
   override def dataType: DataType = LongType
 
-  override def prettyName: String = "xxHash"
+  override def prettyName: String = "xxhash64"
 
   override protected def hasherClassName: String = classOf[XXH64].getName
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
index 3eb3fe6..7257647 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
@@ -161,6 +161,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
       "input to function coalesce should all be the same type")
     assertError(Coalesce(Nil), "function coalesce requires at least one 
argument")
     assertError(new Murmur3Hash(Nil), "function hash requires at least one 
argument")
+    assertError(new XxHash64(Nil), "function xxhash64 requires at least one 
argument")
     assertError(Explode('intField),
       "input to function explode should be array or map type")
     assertError(PosExplode('intField),
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index 555ccb8..79589c9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -630,6 +630,11 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       val murmursHashEval = Murmur3Hash(exprs, 42).eval(input)
       assert(murmur3HashPlan(input).getInt(0) == murmursHashEval)
 
+      val xxHash64Expr = XxHash64(exprs, 42)
+      val xxHash64Plan = GenerateMutableProjection.generate(Seq(xxHash64Expr))
+      val xxHash64Eval = XxHash64(exprs, 42).eval(input)
+      assert(xxHash64Plan(input).getLong(0) == xxHash64Eval)
+
       val hiveHashExpr = HiveHash(exprs)
       val hiveHashPlan = GenerateMutableProjection.generate(Seq(hiveHashExpr))
       val hiveHashEval = HiveHash(exprs).eval(input)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 1199cd8..f99186c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2167,6 +2167,19 @@ object functions {
     new Murmur3Hash(cols.map(_.expr))
   }
 
+  /**
+   * Calculates the hash code of given columns using the 64-bit
+   * variant of the xxHash algorithm, and returns the result as a long
+   * column.
+   *
+   * @group misc_funcs
+   * @since 3.0.0
+   */
+  @scala.annotation.varargs
+  def xxhash64(cols: Column*): Column = withExpr {
+    new XxHash64(cols.map(_.expr))
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // String functions
   
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 16b14a6..e5c2de9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -2884,7 +2884,9 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
       ("coalesce", (df: DataFrame) => df.select(coalesce())) ::
       ("coalesce", (df: DataFrame) => df.selectExpr("coalesce()")) ::
       ("hash", (df: DataFrame) => df.select(hash())) ::
-      ("hash", (df: DataFrame) => df.selectExpr("hash()")) :: Nil
+      ("hash", (df: DataFrame) => df.selectExpr("hash()")) ::
+      ("xxhash64", (df: DataFrame) => df.select(xxhash64())) ::
+      ("xxhash64", (df: DataFrame) => df.selectExpr("xxhash64()")) :: Nil
     funcsMustHaveAtLeastOneArg.foreach { case (name, func) =>
       val errMsg = intercept[AnalysisException] { func(df) }.getMessage
       assert(errMsg.contains(s"input to function $name requires at least one 
argument"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3082e0b..6dec129 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -137,6 +137,25 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       structDf.select(hash($"a", $"record.*")))
   }
 
+  test("Star Expansion - xxhash64") {
+    val structDf = testData2.select("a", "b").as("record")
+    checkAnswer(
+      structDf.groupBy($"a", $"b").agg(min(xxhash64($"a", $"*"))),
+      structDf.groupBy($"a", $"b").agg(min(xxhash64($"a", $"a", $"b"))))
+
+    checkAnswer(
+      structDf.groupBy($"a", $"b").agg(xxhash64($"a", $"*")),
+      structDf.groupBy($"a", $"b").agg(xxhash64($"a", $"a", $"b")))
+
+    checkAnswer(
+      structDf.select(xxhash64($"*")),
+      structDf.select(xxhash64($"record.*")))
+
+    checkAnswer(
+      structDf.select(xxhash64($"a", $"*")),
+      structDf.select(xxhash64($"a", $"record.*")))
+  }
+
   test("Star Expansion - explode should fail with a meaningful message if it 
takes a star") {
     val df = Seq(("1,2"), ("4"), ("7,8,9")).toDF("csv")
     val e = intercept[AnalysisException] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 7b85626..e8d1ecc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2047,6 +2047,17 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
+  test("xxhash64 function") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+    withTempView("tbl") {
+      df.createOrReplaceTempView("tbl")
+      checkAnswer(
+        df.select(xxhash64($"i", $"j")),
+        sql("SELECT xxhash64(i, j) from tbl")
+      )
+    }
+  }
+
   test("join with using clause") {
     val df1 = Seq(("r1c1", "r1c2", "t1r1c3"),
       ("r2c1", "r2c2", "t1r2c3"), ("r3c1x", "r3c2", "t1r3c3")).toDF("c1", 
"c2", "c3")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to