This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f191d652f039 [SPARK-54199][SQL] Add DataFrame API support for new KLL
quantiles sketch functions
f191d652f039 is described below
commit f191d652f0395db4b216b03f5fd4059c1e15a57b
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Nov 6 14:48:31 2025 -0800
[SPARK-54199][SQL] Add DataFrame API support for new KLL quantiles sketch
functions
### What changes were proposed in this pull request?
This PR adds DataFrame API support for the KLL quantile sketch functions
that were previously added to Spark SQL in
https://github.com/apache/spark/pull/52800. This lets users leverage KLL
sketches through both Scala and Python DataFrame APIs in addition to the
existing SQL interface.
**Key additions:**
1. **Scala DataFrame API**
(`sql/api/src/main/scala/org/apache/spark/sql/functions.scala`):
- 18 new functions covering aggregate, merge, quantile, and rank
operations
- Multiple overloads for each function supporting:
- `Column` parameters for computed values
- `String` parameters for column names
- `Int` parameters for literal k values
- Optional k parameters with sensible defaults
- Functions for all three data type variants: bigint, float, double
2. **Python DataFrame API** (`python/pyspark/sql/functions/builtin.py`):
- 18 corresponding Python functions with:
- Comprehensive docstrings with usage examples
- Proper type hints (`ColumnOrName`, `Optional[Union[int, Column]]`)
- Support for both column objects and column name strings
- Added to PySpark documentation reference
3. **Python Spark Connect Support**
(`python/pyspark/sql/connect/functions/builtin.py`):
- Full compatibility with Spark Connect architecture
- All 18 functions properly registered
### Why are the changes needed?
While the SQL API for KLL sketches was previously added, DataFrame API
support is essential for full usability. Without DataFrame API support, users
would be forced to use SQL expressions via `expr()` or `selectExpr()`, which is
less ergonomic and type-safe.
### Does this PR introduce any user-facing change?
Yes, this PR adds DataFrame API support for the 18 KLL sketch functions:
**Scala DataFrame API Example:**
```scala
import org.apache.spark.sql.functions._
// Create sketch with default k
val df = Seq(1, 2, 3, 4, 5).toDF("value")
val sketch = df.agg(kll_sketch_agg_bigint($"value"))
// Create sketch with custom k value
val sketch2 = df.agg(kll_sketch_agg_bigint("value", 400))
// Get median (0.5 quantile)
val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
val median = sketchDf.select(kll_sketch_get_quantile_bigint($"sketch",
lit(0.5)))
// Get multiple quantiles
val quantiles = sketchDf.select(
kll_sketch_get_quantile_bigint($"sketch", array(lit(0.25), lit(0.5),
lit(0.75)))
)
// Merge sketches
val merged = sketchDf.select(
kll_sketch_merge_bigint($"sketch", $"sketch").alias("merged")
)
// Get count of items
val count = sketchDf.select(kll_sketch_get_n_bigint($"sketch"))
```
**Python DataFrame API Example:**
```python
from pyspark.sql import functions as sf
# Create sketch with default k
df = spark.createDataFrame([1, 2, 3, 4, 5], "INT")
sketch = df.agg(sf.kll_sketch_agg_bigint("value"))
# Create sketch with custom k value
sketch2 = df.agg(sf.kll_sketch_agg_bigint("value", 400))
# Get median (0.5 quantile)
sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
median = sketch_df.select(sf.kll_sketch_get_quantile_bigint("sketch",
sf.lit(0.5)))
# Get multiple quantiles
quantiles = sketch_df.select(
sf.kll_sketch_get_quantile_bigint("sketch", sf.array(sf.lit(0.25),
sf.lit(0.5), sf.lit(0.75)))
)
# Merge sketches
merged = sketch_df.select(
sf.kll_sketch_merge_bigint("sketch", "sketch").alias("merged")
)
# Get count of items
count = sketch_df.select(sf.kll_sketch_get_n_bigint("sketch"))
```
### How was this patch tested?
1. **Scala Unit Tests** (`DataFrameAggregateSuite`):
- `kll_sketch_agg_{bigint,float,double}` with default and explicit k
values
- `kll_sketch_to_string` functions for all data types
- `kll_sketch_get_n` functions for all data types
- `kll_sketch_merge` operations
- `kll_sketch_get_quantile` with single rank and array of ranks
- `kll_sketch_get_rank` operations
- Null value handling tests
2. **Python Unit Tests** (`test_functions.py`):
- Comprehensive tests mirroring Scala tests
- Tests for Column object and string column name overloads
- Tests for optional k parameter
- Array input tests for quantile/rank functions
- Null handling validation
- Type checking (bytes/bytearray for sketches, str for to_string,
int/float for values)
### Was this patch authored or co-authored using generative AI tooling?
Yes, IDE assistance used `claude-4.5-sonnet` with manual validation and
integration.
Closes #52900 from dtenedor/dataframe-api-kll-functions.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../source/reference/pyspark.sql/functions.rst | 18 +
python/pyspark/sql/connect/functions/builtin.py | 162 ++++++
python/pyspark/sql/functions/__init__.py | 18 +
python/pyspark/sql/functions/builtin.py | 618 +++++++++++++++++++++
python/pyspark/sql/tests/test_functions.py | 156 ++++++
.../scala/org/apache/spark/sql/functions.scala | 300 ++++++++++
.../analyzer-results/kllquantiles.sql.out | 212 +++++++
.../resources/sql-tests/inputs/kllquantiles.sql | 51 ++
.../sql-tests/results/kllquantiles.sql.out | 256 +++++++++
.../apache/spark/sql/DataFrameAggregateSuite.scala | 164 ++++++
10 files changed, 1955 insertions(+)
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst
b/python/docs/source/reference/pyspark.sql/functions.rst
index edd87c26dbb6..280465af3e91 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -458,6 +458,9 @@ Aggregate Functions
histogram_numeric
hll_sketch_agg
hll_union_agg
+ kll_sketch_agg_bigint
+ kll_sketch_agg_double
+ kll_sketch_agg_float
kurtosis
last
last_value
@@ -631,6 +634,21 @@ Misc Functions
current_user
hll_sketch_estimate
hll_union
+ kll_sketch_get_n_bigint
+ kll_sketch_get_n_double
+ kll_sketch_get_n_float
+ kll_sketch_get_quantile_bigint
+ kll_sketch_get_quantile_double
+ kll_sketch_get_quantile_float
+ kll_sketch_get_rank_bigint
+ kll_sketch_get_rank_double
+ kll_sketch_get_rank_float
+ kll_sketch_merge_bigint
+ kll_sketch_merge_double
+ kll_sketch_merge_float
+ kll_sketch_to_string_bigint
+ kll_sketch_to_string_double
+ kll_sketch_to_string_float
input_file_block_length
input_file_block_start
input_file_name
diff --git a/python/pyspark/sql/connect/functions/builtin.py
b/python/pyspark/sql/connect/functions/builtin.py
index 68caeef6ace8..2bd5b45ed154 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -4529,6 +4529,168 @@ def theta_intersection_agg(
theta_intersection_agg.__doc__ = pysparkfuncs.theta_intersection_agg.__doc__
+def kll_sketch_agg_bigint(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ fn = "kll_sketch_agg_bigint"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_sketch_agg_bigint.__doc__ = pysparkfuncs.kll_sketch_agg_bigint.__doc__
+
+
+def kll_sketch_agg_float(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ fn = "kll_sketch_agg_float"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_sketch_agg_float.__doc__ = pysparkfuncs.kll_sketch_agg_float.__doc__
+
+
+def kll_sketch_agg_double(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ fn = "kll_sketch_agg_double"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_sketch_agg_double.__doc__ = pysparkfuncs.kll_sketch_agg_double.__doc__
+
+
+def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_to_string_bigint"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_to_string_bigint.__doc__ =
pysparkfuncs.kll_sketch_to_string_bigint.__doc__
+
+
+def kll_sketch_to_string_float(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_to_string_float"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_to_string_float.__doc__ =
pysparkfuncs.kll_sketch_to_string_float.__doc__
+
+
+def kll_sketch_to_string_double(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_to_string_double"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_to_string_double.__doc__ =
pysparkfuncs.kll_sketch_to_string_double.__doc__
+
+
+def kll_sketch_get_n_bigint(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_get_n_bigint"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_get_n_bigint.__doc__ = pysparkfuncs.kll_sketch_get_n_bigint.__doc__
+
+
+def kll_sketch_get_n_float(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_get_n_float"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_get_n_float.__doc__ = pysparkfuncs.kll_sketch_get_n_float.__doc__
+
+
+def kll_sketch_get_n_double(col: "ColumnOrName") -> Column:
+ fn = "kll_sketch_get_n_double"
+ return _invoke_function_over_columns(fn, col)
+
+
+kll_sketch_get_n_double.__doc__ = pysparkfuncs.kll_sketch_get_n_double.__doc__
+
+
+def kll_sketch_merge_bigint(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ fn = "kll_sketch_merge_bigint"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+kll_sketch_merge_bigint.__doc__ = pysparkfuncs.kll_sketch_merge_bigint.__doc__
+
+
+def kll_sketch_merge_float(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ fn = "kll_sketch_merge_float"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+kll_sketch_merge_float.__doc__ = pysparkfuncs.kll_sketch_merge_float.__doc__
+
+
+def kll_sketch_merge_double(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ fn = "kll_sketch_merge_double"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+kll_sketch_merge_double.__doc__ = pysparkfuncs.kll_sketch_merge_double.__doc__
+
+
+def kll_sketch_get_quantile_bigint(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_quantile_bigint"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+kll_sketch_get_quantile_bigint.__doc__ =
pysparkfuncs.kll_sketch_get_quantile_bigint.__doc__
+
+
+def kll_sketch_get_quantile_float(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_quantile_float"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+kll_sketch_get_quantile_float.__doc__ =
pysparkfuncs.kll_sketch_get_quantile_float.__doc__
+
+
+def kll_sketch_get_quantile_double(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_quantile_double"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+kll_sketch_get_quantile_double.__doc__ =
pysparkfuncs.kll_sketch_get_quantile_double.__doc__
+
+
+def kll_sketch_get_rank_bigint(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_rank_bigint"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
+kll_sketch_get_rank_bigint.__doc__ =
pysparkfuncs.kll_sketch_get_rank_bigint.__doc__
+
+
+def kll_sketch_get_rank_float(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_rank_float"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
+kll_sketch_get_rank_float.__doc__ =
pysparkfuncs.kll_sketch_get_rank_float.__doc__
+
+
+def kll_sketch_get_rank_double(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ fn = "kll_sketch_get_rank_double"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
+kll_sketch_get_rank_double.__doc__ =
pysparkfuncs.kll_sketch_get_rank_double.__doc__
+
+
def theta_sketch_estimate(col: "ColumnOrName") -> Column:
fn = "theta_sketch_estimate"
return _invoke_function_over_columns(fn, col)
diff --git a/python/pyspark/sql/functions/__init__.py
b/python/pyspark/sql/functions/__init__.py
index fa579a222efa..173e07456453 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -372,6 +372,9 @@ __all__ = [ # noqa: F405
"histogram_numeric",
"hll_sketch_agg",
"hll_union_agg",
+ "kll_sketch_agg_bigint",
+ "kll_sketch_agg_double",
+ "kll_sketch_agg_float",
"kurtosis",
"last",
"last_value",
@@ -495,6 +498,21 @@ __all__ = [ # noqa: F405
"input_file_block_start",
"input_file_name",
"java_method",
+ "kll_sketch_get_n_bigint",
+ "kll_sketch_get_n_double",
+ "kll_sketch_get_n_float",
+ "kll_sketch_get_quantile_bigint",
+ "kll_sketch_get_quantile_double",
+ "kll_sketch_get_quantile_float",
+ "kll_sketch_get_rank_bigint",
+ "kll_sketch_get_rank_double",
+ "kll_sketch_get_rank_float",
+ "kll_sketch_merge_bigint",
+ "kll_sketch_merge_double",
+ "kll_sketch_merge_float",
+ "kll_sketch_to_string_bigint",
+ "kll_sketch_to_string_double",
+ "kll_sketch_to_string_float",
"monotonically_increasing_id",
"raise_error",
"reflect",
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index 021d24bb5b3f..90004b0f6ce5 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -26704,6 +26704,624 @@ def theta_intersection_agg(col: "ColumnOrName") ->
Column:
return _invoke_function_over_columns(fn, col)
+@_try_remote_functions
+def kll_sketch_agg_bigint(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ """
+ Aggregate function: returns the compact binary representation of the
Datasketches
+ KllLongsSketch built with the values in the input column. The optional k
parameter
+ controls the size and accuracy of the sketch (default 200, range 8-65535).
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The column containing bigint values to aggregate
+ k : :class:`~pyspark.sql.Column` or int, optional
+ The k parameter that controls size and accuracy (default 200, range
8-65535)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The binary representation of the KllLongsSketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> result = df.agg(sf.kll_sketch_agg_bigint("value")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_agg_bigint"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+@_try_remote_functions
+def kll_sketch_agg_float(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ """
+ Aggregate function: returns the compact binary representation of the
Datasketches
+ KllFloatsSketch built with the values in the input column. The optional k
parameter
+ controls the size and accuracy of the sketch (default 200, range 8-65535).
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The column containing float values to aggregate
+ k : :class:`~pyspark.sql.Column` or int, optional
+ The k parameter that controls size and accuracy (default 200, range
8-65535)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The binary representation of the KllFloatsSketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> result = df.agg(sf.kll_sketch_agg_float("value")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_agg_float"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+@_try_remote_functions
+def kll_sketch_agg_double(
+ col: "ColumnOrName",
+ k: Optional[Union[int, Column]] = None,
+) -> Column:
+ """
+ Aggregate function: returns the compact binary representation of the
Datasketches
+ KllDoublesSketch built with the values in the input column. The optional k
parameter
+ controls the size and accuracy of the sketch (default 200, range 8-65535).
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The column containing double values to aggregate
+ k : :class:`~pyspark.sql.Column` or int, optional
+ The k parameter that controls size and accuracy (default 200, range
8-65535)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The binary representation of the KllDoublesSketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> result = df.agg(sf.kll_sketch_agg_double("value")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_agg_double"
+ if k is None:
+ return _invoke_function_over_columns(fn, col)
+ else:
+ return _invoke_function_over_columns(fn, col, lit(k))
+
+
+@_try_remote_functions
+def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
+ """
+ Returns a string with human readable summary information about the KLL
bigint sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL bigint sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ A string representation of the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+ >>> result =
sketch_df.select(sf.kll_sketch_to_string_bigint("sketch")).first()[0]
+ >>> "Kll" in result and "N" in result
+ True
+ """
+ fn = "kll_sketch_to_string_bigint"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_to_string_float(col: "ColumnOrName") -> Column:
+ """
+ Returns a string with human readable summary information about the KLL
float sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL float sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ A string representation of the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+ >>> result =
sketch_df.select(sf.kll_sketch_to_string_float("sketch")).first()[0]
+ >>> "Kll" in result and "N" in result
+ True
+ """
+ fn = "kll_sketch_to_string_float"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_to_string_double(col: "ColumnOrName") -> Column:
+ """
+ Returns a string with human readable summary information about the KLL
double sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL double sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ A string representation of the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+ >>> result =
sketch_df.select(sf.kll_sketch_to_string_double("sketch")).first()[0]
+ >>> "Kll" in result and "N" in result
+ True
+ """
+ fn = "kll_sketch_to_string_double"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_get_n_bigint(col: "ColumnOrName") -> Column:
+ """
+ Returns the number of items collected in the KLL bigint sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL bigint sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The count of items in the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_n_bigint("sketch")).show()
+ +-------------------------------+
+ |kll_sketch_get_n_bigint(sketch)|
+ +-------------------------------+
+ | 5|
+ +-------------------------------+
+ """
+ fn = "kll_sketch_get_n_bigint"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_get_n_float(col: "ColumnOrName") -> Column:
+ """
+ Returns the number of items collected in the KLL float sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL float sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The count of items in the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_n_float("sketch")).show()
+ +------------------------------+
+ |kll_sketch_get_n_float(sketch)|
+ +------------------------------+
+ | 5|
+ +------------------------------+
+ """
+ fn = "kll_sketch_get_n_float"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_get_n_double(col: "ColumnOrName") -> Column:
+ """
+ Returns the number of items collected in the KLL double sketch.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ The KLL double sketch binary representation
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The count of items in the sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_n_double("sketch")).show()
+ +-------------------------------+
+ |kll_sketch_get_n_double(sketch)|
+ +-------------------------------+
+ | 5|
+ +-------------------------------+
+ """
+ fn = "kll_sketch_get_n_double"
+ return _invoke_function_over_columns(fn, col)
+
+
+@_try_remote_functions
+def kll_sketch_merge_bigint(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ """
+ Merges two KLL bigint sketch buffers together into one.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ The first KLL bigint sketch
+ right : :class:`~pyspark.sql.Column` or column name
+ The second KLL bigint sketch
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The merged KLL sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+ >>> result = sketch_df.select(sf.kll_sketch_merge_bigint("sketch",
"sketch")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_merge_bigint"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+@_try_remote_functions
+def kll_sketch_merge_float(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ """
+ Merges two KLL float sketch buffers together into one.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ The first KLL float sketch
+ right : :class:`~pyspark.sql.Column` or column name
+ The second KLL float sketch
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The merged KLL sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+ >>> result = sketch_df.select(sf.kll_sketch_merge_float("sketch",
"sketch")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_merge_float"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+@_try_remote_functions
+def kll_sketch_merge_double(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ """
+ Merges two KLL double sketch buffers together into one.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ The first KLL double sketch
+ right : :class:`~pyspark.sql.Column` or column name
+ The second KLL double sketch
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The merged KLL sketch.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+ >>> result = sketch_df.select(sf.kll_sketch_merge_double("sketch",
"sketch")).first()[0]
+ >>> result is not None and len(result) > 0
+ True
+ """
+ fn = "kll_sketch_merge_double"
+ return _invoke_function_over_columns(fn, left, right)
+
+
+@_try_remote_functions
+def kll_sketch_get_quantile_bigint(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ """
+ Extracts a quantile value from a KLL bigint sketch given an input rank
value.
+ The rank can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL bigint sketch binary representation
+ rank : :class:`~pyspark.sql.Column` or column name
+ The rank value(s) to extract (between 0.0 and 1.0)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The quantile value(s).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_quantile_bigint("sketch",
sf.lit(0.5))).show()
+ +-------------------------------------------+
+ |kll_sketch_get_quantile_bigint(sketch, 0.5)|
+ +-------------------------------------------+
+ | 3|
+ +-------------------------------------------+
+ """
+ fn = "kll_sketch_get_quantile_bigint"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+@_try_remote_functions
+def kll_sketch_get_quantile_float(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ """
+ Extracts a quantile value from a KLL float sketch given an input rank
value.
+ The rank can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL float sketch binary representation
+ rank : :class:`~pyspark.sql.Column` or column name
+ The rank value(s) to extract (between 0.0 and 1.0)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The quantile value(s).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_quantile_float("sketch",
sf.lit(0.5))).show()
+ +------------------------------------------+
+ |kll_sketch_get_quantile_float(sketch, 0.5)|
+ +------------------------------------------+
+ | 3.0|
+ +------------------------------------------+
+ """
+ fn = "kll_sketch_get_quantile_float"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+@_try_remote_functions
+def kll_sketch_get_quantile_double(sketch: "ColumnOrName", rank:
"ColumnOrName") -> Column:
+ """
+ Extracts a quantile value from a KLL double sketch given an input rank
value.
+ The rank can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL double sketch binary representation
+ rank : :class:`~pyspark.sql.Column` or column name
+ The rank value(s) to extract (between 0.0 and 1.0)
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The quantile value(s).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_quantile_double("sketch",
sf.lit(0.5))).show()
+ +-------------------------------------------+
+ |kll_sketch_get_quantile_double(sketch, 0.5)|
+ +-------------------------------------------+
+ | 3.0|
+ +-------------------------------------------+
+ """
+ fn = "kll_sketch_get_quantile_double"
+ return _invoke_function_over_columns(fn, sketch, rank)
+
+
+@_try_remote_functions
+def kll_sketch_get_rank_bigint(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ """
+ Extracts a rank value from a KLL bigint sketch given an input quantile
value.
+ The quantile can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL bigint sketch binary representation
+ quantile : :class:`~pyspark.sql.Column` or column name
+ The quantile value(s) to lookup
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The rank value(s) (between 0.0 and 1.0).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1,2,3,4,5], "INT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_rank_bigint("sketch",
sf.lit(3))).show()
+ +-------------------------------------+
+ |kll_sketch_get_rank_bigint(sketch, 3)|
+ +-------------------------------------+
+ | 0.6|
+ +-------------------------------------+
+ """
+ fn = "kll_sketch_get_rank_bigint"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
+@_try_remote_functions
+def kll_sketch_get_rank_float(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ """
+ Extracts a rank value from a KLL float sketch given an input quantile
value.
+ The quantile can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL float sketch binary representation
+ quantile : :class:`~pyspark.sql.Column` or column name
+ The quantile value(s) to lookup
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The rank value(s) (between 0.0 and 1.0).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_rank_float("sketch",
sf.lit(3.0))).show()
+ +--------------------------------------+
+ |kll_sketch_get_rank_float(sketch, 3.0)|
+ +--------------------------------------+
+ | 0.6|
+ +--------------------------------------+
+ """
+ fn = "kll_sketch_get_rank_float"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
+@_try_remote_functions
+def kll_sketch_get_rank_double(sketch: "ColumnOrName", quantile:
"ColumnOrName") -> Column:
+ """
+ Extracts a rank value from a KLL double sketch given an input quantile
value.
+ The quantile can be a single value or an array.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ sketch : :class:`~pyspark.sql.Column` or column name
+ The KLL double sketch binary representation
+ quantile : :class:`~pyspark.sql.Column` or column name
+ The quantile value(s) to lookup
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The rank value(s) (between 0.0 and 1.0).
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
+ >>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+ >>> sketch_df.select(sf.kll_sketch_get_rank_double("sketch",
sf.lit(3.0))).show()
+ +---------------------------------------+
+ |kll_sketch_get_rank_double(sketch, 3.0)|
+ +---------------------------------------+
+ | 0.6|
+ +---------------------------------------+
+ """
+ fn = "kll_sketch_get_rank_double"
+ return _invoke_function_over_columns(fn, sketch, quantile)
+
+
@_try_remote_functions
def theta_sketch_estimate(col: "ColumnOrName") -> Column:
"""
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 0f646b3da549..a1a4f19e0522 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -2114,6 +2114,162 @@ class FunctionsTestsMixin:
None,
)
+ def test_kll_sketch_agg_bigint(self):
+ """Test kll_sketch_agg_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+
+ # Test with default k
+ sketch = df.agg(F.kll_sketch_agg_bigint("value")).first()[0]
+ self.assertIsNotNone(sketch)
+ self.assertIsInstance(sketch, (bytes, bytearray))
+
+ # Test with explicit k
+ sketch_k = df.agg(F.kll_sketch_agg_bigint("value", 400)).first()[0]
+ self.assertIsNotNone(sketch_k)
+
+ def test_kll_sketch_agg_float(self):
+ """Test kll_sketch_agg_float function"""
+ df = self.spark.createDataFrame([1.0, 2.0, 3.0, 4.0, 5.0], "FLOAT")
+
+ sketch = df.agg(F.kll_sketch_agg_float("value")).first()[0]
+ self.assertIsNotNone(sketch)
+ self.assertIsInstance(sketch, (bytes, bytearray))
+
+ def test_kll_sketch_agg_double(self):
+ """Test kll_sketch_agg_double function"""
+ df = self.spark.createDataFrame([1.0, 2.0, 3.0, 4.0, 5.0], "DOUBLE")
+
+ sketch = df.agg(F.kll_sketch_agg_double("value")).first()[0]
+ self.assertIsNotNone(sketch)
+ self.assertIsInstance(sketch, (bytes, bytearray))
+
+ def test_kll_sketch_to_string_bigint(self):
+ """Test kll_sketch_to_string_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ result =
sketch_df.select(F.kll_sketch_to_string_bigint("sketch")).first()[0]
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, str)
+ self.assertIn("Kll", result)
+
+ def test_kll_sketch_get_n_bigint(self):
+ """Test kll_sketch_get_n_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ n = sketch_df.select(F.kll_sketch_get_n_bigint("sketch")).first()[0]
+ self.assertEqual(n, 5)
+
+ def test_kll_sketch_merge_bigint(self):
+ """Test kll_sketch_merge_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ merged = sketch_df.select(F.kll_sketch_merge_bigint("sketch",
"sketch")).first()[0]
+ self.assertIsNotNone(merged)
+ self.assertIsInstance(merged, (bytes, bytearray))
+
+ def test_kll_sketch_get_quantile_bigint(self):
+ """Test kll_sketch_get_quantile_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ quantile = sketch_df.select(F.kll_sketch_get_quantile_bigint("sketch",
F.lit(0.5))).first()[
+ 0
+ ]
+ self.assertIsNotNone(quantile)
+ self.assertGreaterEqual(quantile, 1)
+ self.assertLessEqual(quantile, 5)
+
+ def test_kll_sketch_get_quantile_bigint_array(self):
+ """Test kll_sketch_get_quantile_bigint with array of ranks"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ quantiles = sketch_df.select(
+ F.kll_sketch_get_quantile_bigint(
+ "sketch", F.array(F.lit(0.25), F.lit(0.5), F.lit(0.75))
+ )
+ ).first()[0]
+ self.assertIsNotNone(quantiles)
+ self.assertEqual(len(quantiles), 3)
+
+ def test_kll_sketch_get_rank_bigint(self):
+ """Test kll_sketch_get_rank_bigint function"""
+ df = self.spark.createDataFrame([1, 2, 3, 4, 5], "INT")
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ rank = sketch_df.select(F.kll_sketch_get_rank_bigint("sketch",
F.lit(3))).first()[0]
+ self.assertIsNotNone(rank)
+ self.assertGreaterEqual(rank, 0.0)
+ self.assertLessEqual(rank, 1.0)
+
+ def test_kll_sketch_float_variants(self):
+ """Test all float variant functions"""
+ df = self.spark.createDataFrame([1.0, 2.0, 3.0, 4.0, 5.0], "FLOAT")
+ sketch_df = df.agg(F.kll_sketch_agg_float("value").alias("sketch"))
+
+ # Test to_string
+ string_result =
sketch_df.select(F.kll_sketch_to_string_float("sketch")).first()[0]
+ self.assertIn("Kll", string_result)
+
+ # Test get_n
+ n = sketch_df.select(F.kll_sketch_get_n_float("sketch")).first()[0]
+ self.assertEqual(n, 5)
+
+ # Test merge
+ merged = sketch_df.select(F.kll_sketch_merge_float("sketch",
"sketch")).first()[0]
+ self.assertIsNotNone(merged)
+
+ # Test get_quantile
+ quantile = sketch_df.select(F.kll_sketch_get_quantile_float("sketch",
F.lit(0.5))).first()[
+ 0
+ ]
+ self.assertIsNotNone(quantile)
+
+ # Test get_rank
+ rank = sketch_df.select(F.kll_sketch_get_rank_float("sketch",
F.lit(3.0))).first()[0]
+ self.assertGreaterEqual(rank, 0.0)
+ self.assertLessEqual(rank, 1.0)
+
+ def test_kll_sketch_double_variants(self):
+ """Test all double variant functions"""
+ df = self.spark.createDataFrame([1.0, 2.0, 3.0, 4.0, 5.0], "DOUBLE")
+ sketch_df = df.agg(F.kll_sketch_agg_double("value").alias("sketch"))
+
+ # Test to_string
+ string_result =
sketch_df.select(F.kll_sketch_to_string_double("sketch")).first()[0]
+ self.assertIn("Kll", string_result)
+
+ # Test get_n
+ n = sketch_df.select(F.kll_sketch_get_n_double("sketch")).first()[0]
+ self.assertEqual(n, 5)
+
+ # Test merge
+ merged = sketch_df.select(F.kll_sketch_merge_double("sketch",
"sketch")).first()[0]
+ self.assertIsNotNone(merged)
+
+ # Test get_quantile
+ quantile = sketch_df.select(F.kll_sketch_get_quantile_double("sketch",
F.lit(0.5))).first()[
+ 0
+ ]
+ self.assertIsNotNone(quantile)
+
+ # Test get_rank
+ rank = sketch_df.select(F.kll_sketch_get_rank_double("sketch",
F.lit(3.0))).first()[0]
+ self.assertGreaterEqual(rank, 0.0)
+ self.assertLessEqual(rank, 1.0)
+
+ def test_kll_sketch_with_nulls(self):
+ """Test KLL sketch with null values"""
+ df = self.spark.createDataFrame([(1,), (None,), (3,), (4,), (None,)],
["value"])
+ sketch_df = df.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+ n = sketch_df.select(F.kll_sketch_get_n_bigint("sketch")).first()[0]
+ # Should only count non-null values
+ self.assertEqual(n, 3)
+
def test_datetime_functions(self):
df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
parse_result = df.select(F.to_date(F.col("dateCol"))).first()
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index ab883d5933cf..6f56c86541e4 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1303,6 +1303,306 @@ object functions {
def theta_union_agg(columnName: String): Column =
theta_union_agg(Column(columnName))
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllLongsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_bigint(e: Column, k: Column): Column =
+ Column.fn("kll_sketch_agg_bigint", e, k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllLongsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_bigint(e: Column, k: Int): Column =
+ Column.fn("kll_sketch_agg_bigint", e, lit(k))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllLongsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_bigint(columnName: String, k: Int): Column =
+ kll_sketch_agg_bigint(Column(columnName), k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllLongsSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_bigint(e: Column): Column =
+ Column.fn("kll_sketch_agg_bigint", e)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllLongsSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_bigint(columnName: String): Column =
+ kll_sketch_agg_bigint(Column(columnName))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllFloatsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_float(e: Column, k: Column): Column =
+ Column.fn("kll_sketch_agg_float", e, k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllFloatsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_float(e: Column, k: Int): Column =
+ Column.fn("kll_sketch_agg_float", e, lit(k))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllFloatsSketch built with the values in the input column. The optional k
parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_float(columnName: String, k: Int): Column =
+ kll_sketch_agg_float(Column(columnName), k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllFloatsSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_float(e: Column): Column =
+ Column.fn("kll_sketch_agg_float", e)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllFloatsSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_float(columnName: String): Column =
+ kll_sketch_agg_float(Column(columnName))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllDoublesSketch built with the values in the input column. The optional
k parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_double(e: Column, k: Column): Column =
+ Column.fn("kll_sketch_agg_double", e, k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllDoublesSketch built with the values in the input column. The optional
k parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_double(e: Column, k: Int): Column =
+ Column.fn("kll_sketch_agg_double", e, lit(k))
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllDoublesSketch built with the values in the input column. The optional
k parameter controls
+ * the size and accuracy of the sketch (default 200, range 8-65535).
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_double(columnName: String, k: Int): Column =
+ kll_sketch_agg_double(Column(columnName), k)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllDoublesSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_double(e: Column): Column =
+ Column.fn("kll_sketch_agg_double", e)
+
+ /**
+ * Aggregate function: returns the compact binary representation of the
Datasketches
+ * KllDoublesSketch built with the values in the input column with default k
value of 200.
+ *
+ * @group agg_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_agg_double(columnName: String): Column =
+ kll_sketch_agg_double(Column(columnName))
+
+ /**
+ * Returns a string with human readable summary information about the KLL
bigint sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_to_string_bigint(e: Column): Column =
+ Column.fn("kll_sketch_to_string_bigint", e)
+
+ /**
+ * Returns a string with human readable summary information about the KLL
float sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_to_string_float(e: Column): Column =
+ Column.fn("kll_sketch_to_string_float", e)
+
+ /**
+ * Returns a string with human readable summary information about the KLL
double sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_to_string_double(e: Column): Column =
+ Column.fn("kll_sketch_to_string_double", e)
+
+ /**
+ * Returns the number of items collected in the KLL bigint sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_n_bigint(e: Column): Column =
+ Column.fn("kll_sketch_get_n_bigint", e)
+
+ /**
+ * Returns the number of items collected in the KLL float sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_n_float(e: Column): Column =
+ Column.fn("kll_sketch_get_n_float", e)
+
+ /**
+ * Returns the number of items collected in the KLL double sketch.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_n_double(e: Column): Column =
+ Column.fn("kll_sketch_get_n_double", e)
+
+ /**
+ * Merges two KLL bigint sketch buffers together into one.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_merge_bigint(left: Column, right: Column): Column =
+ Column.fn("kll_sketch_merge_bigint", left, right)
+
+ /**
+ * Merges two KLL float sketch buffers together into one.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_merge_float(left: Column, right: Column): Column =
+ Column.fn("kll_sketch_merge_float", left, right)
+
+ /**
+ * Merges two KLL double sketch buffers together into one.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_merge_double(left: Column, right: Column): Column =
+ Column.fn("kll_sketch_merge_double", left, right)
+
+ /**
+ * Extracts a quantile value from a KLL bigint sketch given an input rank
value. The rank can be
+ * a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_quantile_bigint(sketch: Column, rank: Column): Column =
+ Column.fn("kll_sketch_get_quantile_bigint", sketch, rank)
+
+ /**
+ * Extracts a quantile value from a KLL float sketch given an input rank
value. The rank can be
+ * a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_quantile_float(sketch: Column, rank: Column): Column =
+ Column.fn("kll_sketch_get_quantile_float", sketch, rank)
+
+ /**
+ * Extracts a quantile value from a KLL double sketch given an input rank
value. The rank can be
+ * a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_quantile_double(sketch: Column, rank: Column): Column =
+ Column.fn("kll_sketch_get_quantile_double", sketch, rank)
+
+ /**
+ * Extracts a rank value from a KLL bigint sketch given an input quantile
value. The quantile
+ * can be a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_rank_bigint(sketch: Column, quantile: Column): Column =
+ Column.fn("kll_sketch_get_rank_bigint", sketch, quantile)
+
+ /**
+ * Extracts a rank value from a KLL float sketch given an input quantile
value. The quantile can
+ * be a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_rank_float(sketch: Column, quantile: Column): Column =
+ Column.fn("kll_sketch_get_rank_float", sketch, quantile)
+
+ /**
+ * Extracts a rank value from a KLL double sketch given an input quantile
value. The quantile
+ * can be a single value or an array.
+ *
+ * @group misc_funcs
+ * @since 4.1.0
+ */
+ def kll_sketch_get_rank_double(sketch: Column, quantile: Column): Column =
+ Column.fn("kll_sketch_get_rank_double", sketch, quantile)
+
/**
* Aggregate function: returns the concatenation of non-null input values.
*
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
index 049cd163f628..64fc8998c9e4 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
@@ -1082,6 +1082,218 @@ Project [kll_sketch_get_n_double(0x12345678) AS
invalid_binary_double#xL]
+- OneRowRelation
+-- !query
+SELECT kll_sketch_get_n_bigint(42) AS wrong_argument_type
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42\"",
+ "inputType" : "\"INT\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_bigint(42)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 34,
+ "fragment" : "kll_sketch_get_n_bigint(42)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_n_float(42.0) AS wrong_argument_type
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42.0\"",
+ "inputType" : "\"DECIMAL(3,1)\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_float(42.0)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 35,
+ "fragment" : "kll_sketch_get_n_float(42.0)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_n_double(42.0D) AS wrong_argument_type
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42.0\"",
+ "inputType" : "\"DOUBLE\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_double(42.0)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 37,
+ "fragment" : "kll_sketch_get_n_double(42.0D)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_quantile_bigint(agg, 'invalid') AS quantile_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+)
+-- !query analysis
+Project [kll_sketch_get_quantile_bigint(agg#x, cast(invalid as double)) AS
quantile_string#xL]
++- SubqueryAlias __auto_generated_subquery_name
+ +- Aggregate [kll_sketch_agg_bigint(col1#xL, None, 0, 0) AS agg#x]
+ +- SubqueryAlias spark_catalog.default.t_long_1_5_through_7_11
+ +- Relation
spark_catalog.default.t_long_1_5_through_7_11[col1#xL,col2#xL] parquet
+
+
+-- !query
+SELECT kll_sketch_get_quantile_float(agg, X'deadbeef') AS quantile_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"X'DEADBEEF'\"",
+ "inputType" : "\"BINARY\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_quantile_float(agg, X'DEADBEEF')\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 54,
+ "fragment" : "kll_sketch_get_quantile_float(agg, X'deadbeef')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_quantile_double(agg, true) AS quantile_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"true\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_quantile_double(agg, true)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 48,
+ "fragment" : "kll_sketch_get_quantile_double(agg, true)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_rank_bigint(agg, 'invalid') AS rank_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+)
+-- !query analysis
+Project [kll_sketch_get_rank_bigint(agg#x, cast(invalid as bigint)) AS
rank_string#x]
++- SubqueryAlias __auto_generated_subquery_name
+ +- Aggregate [kll_sketch_agg_bigint(col1#xL, None, 0, 0) AS agg#x]
+ +- SubqueryAlias spark_catalog.default.t_long_1_5_through_7_11
+ +- Relation
spark_catalog.default.t_long_1_5_through_7_11[col1#xL,col2#xL] parquet
+
+
+-- !query
+SELECT kll_sketch_get_rank_float(agg, X'cafebabe') AS rank_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"X'CAFEBABE'\"",
+ "inputType" : "\"BINARY\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"FLOAT\" or \"ARRAY<FLOAT>\")",
+ "sqlExpr" : "\"kll_sketch_get_rank_float(agg, X'CAFEBABE')\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 50,
+ "fragment" : "kll_sketch_get_rank_float(agg, X'cafebabe')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_rank_double(agg, false) AS rank_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"false\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_rank_double(agg, false)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 45,
+ "fragment" : "kll_sketch_get_rank_double(agg, false)"
+ } ]
+}
+
+
-- !query
DROP TABLE IF EXISTS t_int_1_5_through_7_11
-- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
b/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
index 404235b33e67..d0d7fb1f9c12 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
@@ -413,6 +413,57 @@ SELECT kll_sketch_get_n_float(X'cafebabe') AS
invalid_binary_float;
SELECT kll_sketch_get_n_double(X'12345678') AS invalid_binary_double;
+-- Wrong argument types
+SELECT kll_sketch_get_n_bigint(42) AS wrong_argument_type;
+
+SELECT kll_sketch_get_n_float(42.0) AS wrong_argument_type;
+
+SELECT kll_sketch_get_n_double(42.0D) AS wrong_argument_type;
+
+-- Negative tests for kll_sketch_get_quantile functions with invalid second
argument types
+-- Invalid type: STRING instead of DOUBLE for quantile parameter
+SELECT kll_sketch_get_quantile_bigint(agg, 'invalid') AS quantile_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+);
+
+-- Invalid type: BINARY instead of DOUBLE for quantile parameter
+SELECT kll_sketch_get_quantile_float(agg, X'deadbeef') AS quantile_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+);
+
+-- Invalid type: BOOLEAN instead of DOUBLE for quantile parameter
+SELECT kll_sketch_get_quantile_double(agg, true) AS quantile_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+);
+
+-- Negative tests for kll_sketch_get_rank functions with invalid second
argument types
+-- Invalid type: STRING instead of BIGINT for rank value parameter
+SELECT kll_sketch_get_rank_bigint(agg, 'invalid') AS rank_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+);
+
+-- Invalid type: BINARY instead of FLOAT for rank value parameter
+SELECT kll_sketch_get_rank_float(agg, X'cafebabe') AS rank_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+);
+
+-- Invalid type: BOOLEAN instead of DOUBLE for rank value parameter
+SELECT kll_sketch_get_rank_double(agg, false) AS rank_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+);
+
-- Clean up
DROP TABLE IF EXISTS t_int_1_5_through_7_11;
DROP TABLE IF EXISTS t_long_1_5_through_7_11;
diff --git a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
index 383a8e66fce3..3618c851939e 100644
--- a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
@@ -1139,6 +1139,262 @@ org.apache.spark.SparkRuntimeException
}
+-- !query
+SELECT kll_sketch_get_n_bigint(42) AS wrong_argument_type
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42\"",
+ "inputType" : "\"INT\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_bigint(42)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 34,
+ "fragment" : "kll_sketch_get_n_bigint(42)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_n_float(42.0) AS wrong_argument_type
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42.0\"",
+ "inputType" : "\"DECIMAL(3,1)\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_float(42.0)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 35,
+ "fragment" : "kll_sketch_get_n_float(42.0)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_n_double(42.0D) AS wrong_argument_type
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"42.0\"",
+ "inputType" : "\"DOUBLE\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"kll_sketch_get_n_double(42.0)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 37,
+ "fragment" : "kll_sketch_get_n_double(42.0D)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_quantile_bigint(agg, 'invalid') AS quantile_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkNumberFormatException
+{
+ "errorClass" : "CAST_INVALID_INPUT",
+ "sqlState" : "22018",
+ "messageParameters" : {
+ "ansiConfig" : "\"spark.sql.ansi.enabled\"",
+ "expression" : "'invalid'",
+ "sourceType" : "\"STRING\"",
+ "targetType" : "\"DOUBLE\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 53,
+ "fragment" : "kll_sketch_get_quantile_bigint(agg, 'invalid')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_quantile_float(agg, X'deadbeef') AS quantile_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"X'DEADBEEF'\"",
+ "inputType" : "\"BINARY\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_quantile_float(agg, X'DEADBEEF')\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 54,
+ "fragment" : "kll_sketch_get_quantile_float(agg, X'deadbeef')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_quantile_double(agg, true) AS quantile_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"true\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_quantile_double(agg, true)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 48,
+ "fragment" : "kll_sketch_get_quantile_double(agg, true)"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_rank_bigint(agg, 'invalid') AS rank_string
+FROM (
+ SELECT kll_sketch_agg_bigint(col1) AS agg
+ FROM t_long_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkNumberFormatException
+{
+ "errorClass" : "CAST_INVALID_INPUT",
+ "sqlState" : "22018",
+ "messageParameters" : {
+ "ansiConfig" : "\"spark.sql.ansi.enabled\"",
+ "expression" : "'invalid'",
+ "sourceType" : "\"STRING\"",
+ "targetType" : "\"BIGINT\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 49,
+ "fragment" : "kll_sketch_get_rank_bigint(agg, 'invalid')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_rank_float(agg, X'cafebabe') AS rank_binary
+FROM (
+ SELECT kll_sketch_agg_float(col1) AS agg
+ FROM t_float_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"X'CAFEBABE'\"",
+ "inputType" : "\"BINARY\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"FLOAT\" or \"ARRAY<FLOAT>\")",
+ "sqlExpr" : "\"kll_sketch_get_rank_float(agg, X'CAFEBABE')\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 50,
+ "fragment" : "kll_sketch_get_rank_float(agg, X'cafebabe')"
+ } ]
+}
+
+
+-- !query
+SELECT kll_sketch_get_rank_double(agg, false) AS rank_boolean
+FROM (
+ SELECT kll_sketch_agg_double(col1) AS agg
+ FROM t_double_1_5_through_7_11
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"false\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"DOUBLE\" or \"ARRAY<DOUBLE>\")",
+ "sqlExpr" : "\"kll_sketch_get_rank_double(agg, false)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 45,
+ "fragment" : "kll_sketch_get_rank_double(agg, false)"
+ } ]
+}
+
+
-- !query
DROP TABLE IF EXISTS t_int_1_5_through_7_11
-- !query schema
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 37614145fe83..da2fbceae97e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -3302,6 +3302,170 @@ class DataFrameAggregateSuite extends QueryTest
.groupBy($"col1").agg(max("col1"))
checkAnswer(df, Seq(Row(1, 1)))
}
+
+ test("kll_sketch_agg_bigint basic functionality") {
+ val df = Seq(1, 2, 3, 4, 5).toDF("value")
+
+ // Test with default k
+ val sketch1 = df.agg(kll_sketch_agg_bigint($"value")).collect()(0)(0)
+ assert(sketch1 != null)
+ assert(sketch1.asInstanceOf[Array[Byte]].length > 0)
+
+ // Test with explicit k
+ val sketch2 = df.agg(kll_sketch_agg_bigint($"value", 400)).collect()(0)(0)
+ assert(sketch2 != null)
+ assert(sketch2.asInstanceOf[Array[Byte]].length > 0)
+
+ // Test with column name
+ val sketch3 = df.agg(kll_sketch_agg_bigint("value")).collect()(0)(0)
+ assert(sketch3 != null)
+ }
+
+ test("kll_sketch_agg_float basic functionality") {
+ val df = Seq(1.0f, 2.0f, 3.0f, 4.0f, 5.0f).toDF("value")
+
+ val sketch = df.agg(kll_sketch_agg_float($"value")).collect()(0)(0)
+ assert(sketch != null)
+ assert(sketch.asInstanceOf[Array[Byte]].length > 0)
+
+ // Test with k parameter
+ val sketch2 = df.agg(kll_sketch_agg_float($"value", 300)).collect()(0)(0)
+ assert(sketch2 != null)
+ }
+
+ test("kll_sketch_agg_double basic functionality") {
+ val df = Seq(1.0, 2.0, 3.0, 4.0, 5.0).toDF("value")
+
+ val sketch = df.agg(kll_sketch_agg_double($"value")).collect()(0)(0)
+ assert(sketch != null)
+ assert(sketch.asInstanceOf[Array[Byte]].length > 0)
+ }
+
+ test("kll_sketch_to_string functions") {
+ val df = Seq(1, 2, 3, 4, 5).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val result =
sketchDf.select(kll_sketch_to_string_bigint($"sketch")).collect()(0)(0)
+ assert(result != null)
+ assert(result.asInstanceOf[String].length > 0)
+ assert(result.asInstanceOf[String].contains("Kll"))
+ }
+
+ test("kll_sketch_get_n functions") {
+ val df = Seq(1, 2, 3, 4, 5).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val n = sketchDf.select(kll_sketch_get_n_bigint($"sketch")).collect()(0)(0)
+ assert(n == 5L)
+ }
+
+ test("kll_sketch_merge_bigint") {
+ val df = Seq(1, 2, 3).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val merged = sketchDf.select(
+ kll_sketch_merge_bigint($"sketch", $"sketch").alias("merged")
+ ).collect()(0)(0)
+ assert(merged != null)
+ assert(merged.asInstanceOf[Array[Byte]].length > 0)
+ }
+
+ test("kll_sketch_get_quantile_bigint") {
+ val df = Seq(1, 2, 3, 4, 5).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val quantile = sketchDf.select(
+ kll_sketch_get_quantile_bigint($"sketch", lit(0.5))
+ ).collect()(0)(0)
+ assert(quantile.asInstanceOf[Long] >= 1 && quantile.asInstanceOf[Long] <=
5)
+
+ // Test with array of ranks
+ val quantiles = sketchDf.select(
+ kll_sketch_get_quantile_bigint($"sketch", array(lit(0.25), lit(0.5),
lit(0.75)))
+ ).collect()(0)(0)
+ assert(quantiles != null)
+ }
+
+ test("kll_sketch_get_rank_bigint") {
+ val df = Seq(1, 2, 3, 4, 5).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val rank = sketchDf.select(
+ kll_sketch_get_rank_bigint($"sketch", lit(3))
+ ).collect()(0)(0)
+ assert(rank.asInstanceOf[Double] >= 0.0 && rank.asInstanceOf[Double] <=
1.0)
+ }
+
+ test("kll_sketch float variants") {
+ val df = Seq(1.0f, 2.0f, 3.0f, 4.0f, 5.0f).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_float($"value").alias("sketch"))
+
+ // Test to_string
+ val str =
sketchDf.select(kll_sketch_to_string_float($"sketch")).collect()(0)(0)
+ assert(str.asInstanceOf[String].contains("Kll"))
+
+ // Test get_n
+ val n = sketchDf.select(kll_sketch_get_n_float($"sketch")).collect()(0)(0)
+ assert(n == 5L)
+
+ // Test merge
+ val merged = sketchDf.select(
+ kll_sketch_merge_float($"sketch", $"sketch")
+ ).collect()(0)(0)
+ assert(merged != null)
+
+ // Test get_quantile
+ val quantile = sketchDf.select(
+ kll_sketch_get_quantile_float($"sketch", lit(0.5))
+ ).collect()(0)(0)
+ assert(quantile != null)
+
+ // Test get_rank
+ val rank = sketchDf.select(
+ kll_sketch_get_rank_float($"sketch", lit(3.0f))
+ ).collect()(0)(0)
+ assert(rank.asInstanceOf[Double] >= 0.0 && rank.asInstanceOf[Double] <=
1.0)
+ }
+
+ test("kll_sketch double variants") {
+ val df = Seq(1.0, 2.0, 3.0, 4.0, 5.0).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_double($"value").alias("sketch"))
+
+ // Test to_string
+ val str =
sketchDf.select(kll_sketch_to_string_double($"sketch")).collect()(0)(0)
+ assert(str.asInstanceOf[String].contains("Kll"))
+
+ // Test get_n
+ val n = sketchDf.select(kll_sketch_get_n_double($"sketch")).collect()(0)(0)
+ assert(n == 5L)
+
+ // Test merge
+ val merged = sketchDf.select(
+ kll_sketch_merge_double($"sketch", $"sketch")
+ ).collect()(0)(0)
+ assert(merged != null)
+
+ // Test get_quantile
+ val quantile = sketchDf.select(
+ kll_sketch_get_quantile_double($"sketch", lit(0.5))
+ ).collect()(0)(0)
+ assert(quantile != null)
+
+ // Test get_rank
+ val rank = sketchDf.select(
+ kll_sketch_get_rank_double($"sketch", lit(3.0))
+ ).collect()(0)(0)
+ assert(rank.asInstanceOf[Double] >= 0.0 && rank.asInstanceOf[Double] <=
1.0)
+ }
+
+ test("kll_sketch with null values") {
+ val df = Seq(Some(1), None, Some(3), Some(4), None).toDF("value")
+ val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+ val n = sketchDf.select(kll_sketch_get_n_bigint($"sketch")).collect()(0)(0)
+ // Should only count non-null values
+ assert(n == 3L)
+ }
}
case class B(c: Option[Double])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]