This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b67d369 [SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary
columns to Long
b67d369 is described below
commit b67d36957287c1fbefa1996e6a4a009a75c4c3f8
Author: Huon Wilson <[email protected]>
AuthorDate: Wed Mar 20 16:34:34 2019 +0800
[SPARK-27099][SQL] Add 'xxhash64' for hashing arbitrary columns to Long
## What changes were proposed in this pull request?
This introduces a new SQL function 'xxhash64' for getting a 64-bit hash of
an arbitrary number of columns.
This is designed to exactly mimic the 32-bit `hash`, which uses
MurmurHash3. The name is designed to be more future-proof than the
'hash', by indicating the exact algorithm used, similar to md5 and the
sha hashes.
## How was this patch tested?
The tests for the existing `hash` function were duplicated to run with
`xxhash64`.
Closes #24019 from huonw/hash64.
Authored-by: Huon Wilson <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
R/pkg/NAMESPACE | 1 +
R/pkg/R/functions.R | 19 +++++++++++++++++++
R/pkg/R/generics.R | 4 ++++
R/pkg/tests/fulltests/test_sparkSQL.R | 2 +-
python/pyspark/sql/functions.py | 13 +++++++++++++
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +++++
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 +
.../apache/spark/sql/catalyst/expressions/hash.scala | 9 ++++++++-
.../analysis/ExpressionTypeCheckingSuite.scala | 1 +
.../catalyst/expressions/HashExpressionsSuite.scala | 5 +++++
.../main/scala/org/apache/spark/sql/functions.scala | 13 +++++++++++++
.../apache/spark/sql/DataFrameFunctionsSuite.scala | 4 +++-
.../scala/org/apache/spark/sql/DataFrameSuite.scala | 19 +++++++++++++++++++
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 11 +++++++++++
14 files changed, 104 insertions(+), 3 deletions(-)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 1dcad16..f9d9494 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -408,6 +408,7 @@ exportMethods("%<=>%",
"weekofyear",
"when",
"window",
+ "xxhash64",
"year")
exportClasses("GroupedData")
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 8f425b1..d91896a 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -736,6 +736,25 @@ setMethod("hash",
})
#' @details
+#' \code{xxhash64}: Calculates the hash code of given columns using the 64-bit
+#' variant of the xxHash algorithm, and returns the result as a long
+#' column.
+#'
+#' @rdname column_misc_functions
+#' @aliases xxhash64 xxhash64,Column-method
+#' @note xxhash64 since 3.0.0
+setMethod("xxhash64",
+ signature(x = "Column"),
+ function(x, ...) {
+ jcols <- lapply(list(x, ...), function(x) {
+ stopifnot(class(x) == "Column")
+ x@jc
+ })
+ jc <- callJStatic("org.apache.spark.sql.functions", "xxhash64",
jcols)
+ column(jc)
+ })
+
+#' @details
#' \code{dayofmonth}: Extracts the day of the month as an integer from a
#' given date/timestamp/string.
#'
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index fcb511e..f849dd1 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1394,6 +1394,10 @@ setGeneric("weekofyear", function(x) {
standardGeneric("weekofyear") })
#' @name NULL
setGeneric("window", function(x, ...) { standardGeneric("window") })
+#' @rdname column_misc_functions
+#' @name NULL
+setGeneric("xxhash64", function(x, ...) { standardGeneric("xxhash64") })
+
#' @rdname column_datetime_functions
#' @name NULL
setGeneric("year", function(x) { standardGeneric("year") })
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index c9d6134..cebd0f8 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1390,7 +1390,7 @@ test_that("column functions", {
c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) +
sqrt(c) + sum(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + degrees(c) + radians(c)
c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
- c12 <- variance(c) + ltrim(c, "a") + rtrim(c, "b") + trim(c, "c")
+ c12 <- variance(c) + xxhash64(c) + ltrim(c, "a") + rtrim(c, "b") + trim(c,
"c")
c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
c14 <- cume_dist() + ntile(1) + corr(c, c1)
c15 <- dense_rank() + percent_rank() + rank() + row_number()
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0326613..bc28c9d 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1468,6 +1468,19 @@ def hash(*cols):
return Column(jc)
+@since(3.0)
+def xxhash64(*cols):
+ """Calculates the hash code of given columns using the 64-bit variant of
the xxHash algorithm,
+ and returns the result as a long column.
+
+ >>> spark.createDataFrame([('ABC',)],
['a']).select(xxhash64('a').alias('hash')).collect()
+ [Row(hash=4105715581806190027)]
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.xxhash64(_to_seq(sc, cols, _to_java_column))
+ return Column(jc)
+
+
# ---------------------- String/Binary functions ------------------------------
_string_functions = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ab9cedc..e4cf43d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1047,6 +1047,11 @@ class Analyzer(
case s: Star => s.expand(child, resolver)
case o => o :: Nil
})
+ case p: XxHash64 if containsStar(p.children) =>
+ p.copy(children = p.children.flatMap {
+ case s: Star => s.expand(child, resolver)
+ case o => o :: Nil
+ })
// count(*) has been replaced by count(1)
case o if containsStar(o.children) =>
failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'")
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index befc02f..46cf0f9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -461,6 +461,7 @@ object FunctionRegistry {
expression[Md5]("md5"),
expression[Uuid]("uuid"),
expression[Murmur3Hash]("hash"),
+ expression[XxHash64]("xxhash64"),
expression[Sha1]("sha"),
expression[Sha1]("sha1"),
expression[Sha2]("sha2"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 8d17b07..21c28c5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -594,12 +594,19 @@ object Murmur3HashFunction extends
InterpretedHashFunction {
/**
* A xxHash64 64-bit hash expression.
*/
+@ExpressionDescription(
+ usage = "_FUNC_(expr1, expr2, ...) - Returns a 64-bit hash value of the
arguments.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_('Spark', array(123), 2);
+ 5602566077635097486
+ """)
case class XxHash64(children: Seq[Expression], seed: Long) extends
HashExpression[Long] {
def this(arguments: Seq[Expression]) = this(arguments, 42L)
override def dataType: DataType = LongType
- override def prettyName: String = "xxHash"
+ override def prettyName: String = "xxhash64"
override protected def hasherClassName: String = classOf[XXH64].getName
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
index 3eb3fe6..7257647 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
@@ -161,6 +161,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
"input to function coalesce should all be the same type")
assertError(Coalesce(Nil), "function coalesce requires at least one
argument")
assertError(new Murmur3Hash(Nil), "function hash requires at least one
argument")
+ assertError(new XxHash64(Nil), "function xxhash64 requires at least one
argument")
assertError(Explode('intField),
"input to function explode should be array or map type")
assertError(PosExplode('intField),
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index 555ccb8..79589c9 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -630,6 +630,11 @@ class HashExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
val murmursHashEval = Murmur3Hash(exprs, 42).eval(input)
assert(murmur3HashPlan(input).getInt(0) == murmursHashEval)
+ val xxHash64Expr = XxHash64(exprs, 42)
+ val xxHash64Plan = GenerateMutableProjection.generate(Seq(xxHash64Expr))
+ val xxHash64Eval = XxHash64(exprs, 42).eval(input)
+ assert(xxHash64Plan(input).getLong(0) == xxHash64Eval)
+
val hiveHashExpr = HiveHash(exprs)
val hiveHashPlan = GenerateMutableProjection.generate(Seq(hiveHashExpr))
val hiveHashEval = HiveHash(exprs).eval(input)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 1199cd8..f99186c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2167,6 +2167,19 @@ object functions {
new Murmur3Hash(cols.map(_.expr))
}
+ /**
+ * Calculates the hash code of given columns using the 64-bit
+ * variant of the xxHash algorithm, and returns the result as a long
+ * column.
+ *
+ * @group misc_funcs
+ * @since 3.0.0
+ */
+ @scala.annotation.varargs
+ def xxhash64(cols: Column*): Column = withExpr {
+ new XxHash64(cols.map(_.expr))
+ }
+
//////////////////////////////////////////////////////////////////////////////////////////////
// String functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 16b14a6..e5c2de9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -2884,7 +2884,9 @@ class DataFrameFunctionsSuite extends QueryTest with
SharedSQLContext {
("coalesce", (df: DataFrame) => df.select(coalesce())) ::
("coalesce", (df: DataFrame) => df.selectExpr("coalesce()")) ::
("hash", (df: DataFrame) => df.select(hash())) ::
- ("hash", (df: DataFrame) => df.selectExpr("hash()")) :: Nil
+ ("hash", (df: DataFrame) => df.selectExpr("hash()")) ::
+ ("xxhash64", (df: DataFrame) => df.select(xxhash64())) ::
+ ("xxhash64", (df: DataFrame) => df.selectExpr("xxhash64()")) :: Nil
funcsMustHaveAtLeastOneArg.foreach { case (name, func) =>
val errMsg = intercept[AnalysisException] { func(df) }.getMessage
assert(errMsg.contains(s"input to function $name requires at least one
argument"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3082e0b..6dec129 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -137,6 +137,25 @@ class DataFrameSuite extends QueryTest with
SharedSQLContext {
structDf.select(hash($"a", $"record.*")))
}
+ test("Star Expansion - xxhash64") {
+ val structDf = testData2.select("a", "b").as("record")
+ checkAnswer(
+ structDf.groupBy($"a", $"b").agg(min(xxhash64($"a", $"*"))),
+ structDf.groupBy($"a", $"b").agg(min(xxhash64($"a", $"a", $"b"))))
+
+ checkAnswer(
+ structDf.groupBy($"a", $"b").agg(xxhash64($"a", $"*")),
+ structDf.groupBy($"a", $"b").agg(xxhash64($"a", $"a", $"b")))
+
+ checkAnswer(
+ structDf.select(xxhash64($"*")),
+ structDf.select(xxhash64($"record.*")))
+
+ checkAnswer(
+ structDf.select(xxhash64($"a", $"*")),
+ structDf.select(xxhash64($"a", $"record.*")))
+ }
+
test("Star Expansion - explode should fail with a meaningful message if it
takes a star") {
val df = Seq(("1,2"), ("4"), ("7,8,9")).toDF("csv")
val e = intercept[AnalysisException] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 7b85626..e8d1ecc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2047,6 +2047,17 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
}
}
+ test("xxhash64 function") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+ withTempView("tbl") {
+ df.createOrReplaceTempView("tbl")
+ checkAnswer(
+ df.select(xxhash64($"i", $"j")),
+ sql("SELECT xxhash64(i, j) from tbl")
+ )
+ }
+ }
+
test("join with using clause") {
val df1 = Seq(("r1c1", "r1c2", "t1r1c3"),
("r2c1", "r2c2", "t1r2c3"), ("r3c1x", "r3c2", "t1r3c3")).toDF("c1",
"c2", "c3")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]