This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a39c1b8e5e25 [SPARK-54785][SQL] Add support for binary sketch 
aggregations in KLL
a39c1b8e5e25 is described below

commit a39c1b8e5e2506d0da530f98dcd1c55558f98282
Author: Chris Boumalhab <[email protected]>
AuthorDate: Mon Jan 12 13:52:23 2026 -0800

    [SPARK-54785][SQL] Add support for binary sketch aggregations in KLL
    
    This PR adds SQL aggregate functions with their tests for the KLL merge 
aggregate functions:
    - `kll_merge_agg_bigint`
    - `kll_merge_agg_float`
    - `kll_merge_agg_double`
    
    These aggregate functions merge multiple binary KLL sketch representations.
    
    Initial PRs:
     - https://github.com/apache/spark/pull/52900/
     - https://github.com/apache/spark/pull/52800/
    
    The existing scalar `kll_sketch_merge_*` functions can only merge two 
sketches at a time. In distributed computing scenarios where sketches are 
pre-computed across multiple partitions, time windows, or datasets, users need 
to merge many sketches together.
    
    Yes, this PR adds 3 new aggregate functions.
    
    New SQL tests were added to 
`sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql`:
    
    **Positive tests:**
    - Merging bigint/float/double sketches from multiple rows
    - Merging with custom k parameters (400, 300, 500)
    - NULL value handling
    
    **Negative tests:**
    - Type mismatches (passing non-binary types)
    - Invalid binary data
    - k parameter validation (too small, too large, NULL, non-constant)
    
    claude-4.5-sonnet and manual changes.
    
    Closes #53548 from cboumalh/cboumalh-kll-enhancement.
    
    Lead-authored-by: Chris Boumalhab <[email protected]>
    Co-authored-by: Chris Boumalhab <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
    (cherry picked from commit fc15f726eabd0a069dc47b195e1a91aa6b1bf541)
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 docs/sql-ref-sketch-aggregates.md                  | 988 +++++++++++++++++++++
 .../source/reference/pyspark.sql/functions.rst     |   3 +
 python/pyspark/sql/connect/functions/builtin.py    |  42 +
 python/pyspark/sql/functions/__init__.py           |   3 +
 python/pyspark/sql/functions/builtin.py            | 132 +++
 python/pyspark/sql/tests/test_functions.py         |  91 ++
 .../scala/org/apache/spark/sql/functions.scala     | 296 ++++--
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  33 +-
 .../expressions/aggregate/kllAggregates.scala      | 440 ++++++++-
 .../sql-functions/sql-expression-schema.md         |   3 +
 .../analyzer-results/kllquantiles.sql.out          | 485 ++++++++++
 .../resources/sql-tests/inputs/kllquantiles.sql    | 206 ++++-
 .../sql-tests/results/kllquantiles.sql.out         | 454 ++++++++++
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 104 +++
 14 files changed, 3150 insertions(+), 130 deletions(-)

diff --git a/docs/sql-ref-sketch-aggregates.md 
b/docs/sql-ref-sketch-aggregates.md
new file mode 100644
index 000000000000..6b92ba7b3c9e
--- /dev/null
+++ b/docs/sql-ref-sketch-aggregates.md
@@ -0,0 +1,988 @@
+---
+layout: global
+title: Sketch-Based Approximate Functions
+displayTitle: Sketch-Based Approximate Functions
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+Spark's SQL and DataFrame APIs provide a collection of sketch-based 
approximate functions powered by the [Apache 
DataSketches](https://datasketches.apache.org/) library. These functions enable 
efficient probabilistic computations on large datasets with bounded memory 
usage and accuracy guarantees.
+
+Sketches are compact data structures that summarize large datasets, supporting 
distributed aggregation through serialization and merging. This makes them 
ideal for use cases including (so far):
+- **Approximate count distinct** (HLL and Theta sketches)
+- **Approximate quantile estimation** (KLL sketches)
+- **Approximate frequent items** (Top-K sketches)
+- **Set operations** on distinct counts (Theta sketches)
+
+### Table of Contents
+
+* [HyperLogLog (HLL) Sketch Functions](#hyperloglog-hll-sketch-functions)
+  * [hll_sketch_agg](#hll_sketch_agg)
+  * [hll_union_agg](#hll_union_agg)
+  * [hll_sketch_estimate](#hll_sketch_estimate)
+  * [hll_union](#hll_union)
+* [Theta Sketch Functions](#theta-sketch-functions)
+  * [theta_sketch_agg](#theta_sketch_agg)
+  * [theta_union_agg](#theta_union_agg)
+  * [theta_intersection_agg](#theta_intersection_agg)
+  * [theta_sketch_estimate](#theta_sketch_estimate)
+  * [theta_union](#theta_union)
+  * [theta_intersection](#theta_intersection)
+  * [theta_difference](#theta_difference)
+* [KLL Quantile Sketch Functions](#kll-quantile-sketch-functions)
+  * [kll_sketch_agg_*](#kll_sketch_agg_)
+  * [kll_merge_agg_*](#kll_merge_agg_)
+  * [kll_sketch_to_string_*](#kll_sketch_to_string_)
+  * [kll_sketch_get_n_*](#kll_sketch_get_n_)
+  * [kll_sketch_merge_*](#kll_sketch_merge_)
+  * [kll_sketch_get_quantile_*](#kll_sketch_get_quantile_)
+  * [kll_sketch_get_rank_*](#kll_sketch_get_rank_)
+* [Approximate Top-K Functions](#approximate-top-k-functions)
+  * [approx_top_k_accumulate](#approx_top_k_accumulate)
+  * [approx_top_k_combine](#approx_top_k_combine)
+  * [approx_top_k_estimate](#approx_top_k_estimate)
+* [Best Practices](#best-practices)
+  * [Choosing Between HLL and Theta 
Sketches](#choosing-between-hll-and-theta-sketches)
+  * [Accuracy vs. Memory Trade-offs](#accuracy-vs-memory-trade-offs)
+  * [Storing and Reusing Sketches](#storing-and-reusing-sketches)
+* [Common Use Cases and Examples](#common-use-cases-and-examples)
+  * [Example: Tracking Daily Unique Users with HLL 
Sketches](#example-tracking-daily-unique-users-with-hll-sketches)
+  * [Example: Computing Percentiles Over Time with KLL 
Sketches](#example-computing-percentiles-over-time-with-kll-sketches)
+  * [Example: Set Operations with Theta 
Sketches](#example-set-operations-with-theta-sketches)
+  * [Example: Finding Trending Items with Top-K 
Sketches](#example-finding-trending-items-with-top-k-sketches)
+
+---
+
+## HyperLogLog (HLL) Sketch Functions
+
+HyperLogLog sketches provide approximate count distinct functionality with 
configurable accuracy and memory usage. They are well-suited for counting 
unique values in very large datasets.
+
+See the [Apache DataSketches HLL 
documentation](https://datasketches.apache.org/docs/HLL/HLL.html) for more 
information.
+
+### hll_sketch_agg
+
+Creates an HLL sketch from input values that can later be used to estimate 
count distinct.
+
+**Syntax:**
+```sql
+hll_sketch_agg(expr [, lgConfigK])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `expr` | INT, BIGINT, STRING, or BINARY | The expression whose distinct 
values will be counted |
+| `lgConfigK` | INT (optional) | Log-base-2 of K, where K is the number of 
buckets. Range: 4-21. Default: 12. Higher values provide more accuracy but use 
more memory. |
+
+Returns a BINARY containing the HLL sketch in updatable binary representation.
+
+**Examples:**
+```sql
+-- Basic usage: create a sketch and estimate distinct count
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES (1), (1), (2), (2), (3) tab(col);
+-- Result: 3
+
+-- With custom lgConfigK for higher accuracy
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 16))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col);
+-- Result: 4
+
+-- With string values
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col);
+-- Result: 3
+```
+
+**Notes:**
+- NULL values are ignored during aggregation.
+- Empty strings (for STRING type) and empty byte arrays (for BINARY type) are 
ignored.
+- The sketch can be stored and later merged with other sketches using 
`hll_union` or `hll_union_agg`.
+
+---
+
+### hll_union_agg
+
+Aggregates multiple HLL sketches into a single merged sketch.
+
+**Syntax:**
+```sql
+hll_union_agg(sketch [, allowDifferentLgConfigK])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | An HLL sketch in binary format (produced by 
`hll_sketch_agg`) |
+| `allowDifferentLgConfigK` | BOOLEAN (optional) | If true, allows merging 
sketches with different lgConfigK values. Default: false. |
+
+Returns a BINARY containing the merged HLL sketch.
+
+**Examples:**
+```sql
+-- Merge sketches from different partitions
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+FROM (
+  SELECT hll_sketch_agg(col) as sketch
+  FROM VALUES (1) tab(col)
+  UNION ALL
+  SELECT hll_sketch_agg(col, 20) as sketch
+  FROM VALUES (1) tab(col)
+);
+-- Result: 1
+
+-- Standard merge (same lgConfigK)
+SELECT hll_sketch_estimate(hll_union_agg(sketch))
+FROM (
+  SELECT hll_sketch_agg(col) as sketch
+  FROM VALUES (1), (2) tab(col)
+  UNION ALL
+  SELECT hll_sketch_agg(col) as sketch
+  FROM VALUES (3), (4) tab(col)
+);
+-- Result: 4
+```
+
+**Notes:**
+- If `allowDifferentLgConfigK` is false and sketches have different lgConfigK 
values, an error is thrown.
+- The output sketch uses the minimum lgConfigK value of all input sketches 
when merging sketches with different sizes.
+
+---
+
+### hll_sketch_estimate
+
+Estimates the number of unique values from an HLL sketch.
+
+**Syntax:**
+```sql
+hll_sketch_estimate(sketch)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | An HLL sketch in binary format |
+
+Returns a BIGINT representing the estimated count of distinct values.
+
+**Examples:**
+```sql
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES (1), (1), (2), (2), (3) tab(col);
+-- Result: 3
+```
+
+**Errors:**
+- Throws an error if the input is not a valid HLL sketch binary representation.
+
+---
+
+### hll_union
+
+Merges two HLL sketches into one (scalar function).
+
+**Syntax:**
+```sql
+hll_union(first, second [, allowDifferentLgConfigK])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `first` | BINARY | First HLL sketch |
+| `second` | BINARY | Second HLL sketch |
+| `allowDifferentLgConfigK` | BOOLEAN (optional) | Allow different lgConfigK 
values. Default: false. |
+
+Returns a BINARY containing the merged HLL sketch.
+
+**Examples:**
+```sql
+SELECT hll_sketch_estimate(
+  hll_union(
+    hll_sketch_agg(col1),
+    hll_sketch_agg(col2)))
+FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2);
+-- Result: 6
+```
+
+---
+
+## Theta Sketch Functions
+
+Theta sketches provide approximate count distinct with support for set 
operations (union, intersection, and difference). This makes them ideal for 
computing unique counts across overlapping datasets.
+
+See the [Apache DataSketches Theta 
documentation](https://datasketches.apache.org/docs/Theta/ThetaSketches.html) 
for more information.
+
+### theta_sketch_agg
+
+Creates a Theta sketch from input values.
+
+**Syntax:**
+```sql
+theta_sketch_agg(expr [, lgNomEntries])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `expr` | INT, BIGINT, FLOAT, DOUBLE, STRING, BINARY, ARRAY&lt;INT&gt;, or 
ARRAY&lt;BIGINT&gt; | The expression whose distinct values will be counted |
+| `lgNomEntries` | INT (optional) | Log-base-2 of nominal entries. Range: 
4-26. Default: 12. |
+
+Returns a BINARY containing the Theta sketch in compact binary representation.
+
+**Examples:**
+```sql
+-- Basic distinct count
+SELECT theta_sketch_estimate(theta_sketch_agg(col))
+FROM VALUES (1), (1), (2), (2), (3) tab(col);
+-- Result: 3
+
+-- With custom lgNomEntries
+SELECT theta_sketch_estimate(theta_sketch_agg(col, 22))
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: 7
+
+-- With array values
+SELECT theta_sketch_estimate(theta_sketch_agg(col))
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)), (ARRAY(1, 2)) tab(col);
+-- Result: 2
+```
+
+**Notes:**
+- NULL values are ignored.
+- Supports a wider range of input types compared to HLL sketches.
+- Empty arrays, empty strings, and empty binary values are ignored.
+
+---
+
+### theta_union_agg
+
+Aggregates multiple Theta sketches using union operation.
+
+**Syntax:**
+```sql
+theta_union_agg(sketch [, lgNomEntries])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A Theta sketch in binary format |
+| `lgNomEntries` | INT (optional) | Log-base-2 of nominal entries. Range: 
4-26. Default: 12. |
+
+Returns a BINARY containing the merged Theta sketch.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(theta_union_agg(sketch, 15))
+FROM (
+  SELECT theta_sketch_agg(col1) as sketch
+  FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col1)
+  UNION ALL
+  SELECT theta_sketch_agg(col2, 20) as sketch
+  FROM VALUES (5), (6), (7), (8), (9), (10), (11) tab(col2)
+);
+-- Result: 11
+```
+
+---
+
+### theta_intersection_agg
+
+Aggregates multiple Theta sketches using intersection operation (finds common 
distinct values).
+
+**Syntax:**
+```sql
+theta_intersection_agg(sketch)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A Theta sketch in binary format |
+
+Returns a BINARY containing the intersected Theta sketch.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(theta_intersection_agg(sketch))
+FROM (
+  SELECT theta_sketch_agg(col1) as sketch
+  FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col1)
+  UNION ALL
+  SELECT theta_sketch_agg(col2) as sketch
+  FROM VALUES (5), (6), (7), (8), (9), (10), (11) tab(col2)
+);
+-- Result: 3 (values 5, 6, 7 are common)
+```
+
+---
+
+### theta_sketch_estimate
+
+Estimates the number of unique values from a Theta sketch.
+
+**Syntax:**
+```sql
+theta_sketch_estimate(sketch)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A Theta sketch in binary format |
+
+Returns a BIGINT representing the estimated count of distinct values.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(theta_sketch_agg(col))
+FROM VALUES (1), (1), (2), (2), (3) tab(col);
+-- Result: 3
+```
+
+---
+
+### theta_union
+
+Merges two Theta sketches using union (scalar function).
+
+**Syntax:**
+```sql
+theta_union(first, second [, lgNomEntries])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `first` | BINARY | First Theta sketch |
+| `second` | BINARY | Second Theta sketch |
+| `lgNomEntries` | INT (optional) | Log-base-2 of nominal entries. Range: 
4-26. Default: 12. |
+
+Returns a BINARY containing the merged Theta sketch.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(
+  theta_union(
+    theta_sketch_agg(col1),
+    theta_sketch_agg(col2)))
+FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2);
+-- Result: 6
+```
+
+---
+
+### theta_intersection
+
+Computes the intersection of two Theta sketches (scalar function).
+
+**Syntax:**
+```sql
+theta_intersection(first, second)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `first` | BINARY | First Theta sketch |
+| `second` | BINARY | Second Theta sketch |
+
+Returns a BINARY containing the intersected Theta sketch.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(
+  theta_intersection(
+    theta_sketch_agg(col1),
+    theta_sketch_agg(col2)))
+FROM VALUES (5, 4), (1, 4), (2, 5), (2, 5), (3, 1) tab(col1, col2);
+-- Result: 2 (values 1 and 5 are common)
+```
+
+---
+
+### theta_difference
+
+Computes the set difference of two Theta sketches (A - B).
+
+**Syntax:**
+```sql
+theta_difference(first, second)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `first` | BINARY | First Theta sketch (A) |
+| `second` | BINARY | Second Theta sketch (B) |
+
+Returns a BINARY containing a Theta sketch representing values in A but not in 
B.
+
+**Examples:**
+```sql
+SELECT theta_sketch_estimate(
+  theta_difference(
+    theta_sketch_agg(col1),
+    theta_sketch_agg(col2)))
+FROM VALUES (5, 4), (1, 4), (2, 5), (2, 5), (3, 1) tab(col1, col2);
+-- Result: 2 (values 2 and 3 are in col1 but not col2)
+```
+
+---
+
+## KLL Quantile Sketch Functions
+
+KLL (K-Linear-Logarithmic) sketches provide approximate quantile estimation. 
They are useful for computing percentiles, medians, and other order statistics 
on large datasets without sorting.
+
+See the [Apache DataSketches KLL 
documentation](https://datasketches.apache.org/docs/KLL/KLLSketch.html) for 
more information.
+
+KLL functions are type-specific to avoid precision loss:
+- **BIGINT** variants: For integer types (TINYINT, SMALLINT, INT, BIGINT)
+- **FLOAT** variants: For FLOAT values only
+- **DOUBLE** variants: For FLOAT and DOUBLE values
+
+### kll_sketch_agg_*
+
+Creates a KLL sketch from numeric values for quantile estimation.
+
+**Syntax:**
+```sql
+kll_sketch_agg_bigint(expr [, k])
+kll_sketch_agg_float(expr [, k])
+kll_sketch_agg_double(expr [, k])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `expr` | Numeric (see variants above) | The numeric column to summarize |
+| `k` | INT (optional) | Controls accuracy and size. Range: 8-65535. Default: 
200 (~1.65% normalized rank error). |
+
+Returns a BINARY containing the KLL sketch in compact binary representation.
+
+**Examples:**
+```sql
+-- Get median (0.5 quantile)
+SELECT kll_sketch_get_quantile_bigint(kll_sketch_agg_bigint(col), 0.5)
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: 4
+
+-- With custom k for higher accuracy
+SELECT kll_sketch_get_quantile_bigint(kll_sketch_agg_bigint(col, 400), 0.5)
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: 4
+```
+
+**Notes:**
+- Use the appropriate variant to avoid precision loss: use `_bigint` for 
integers, `_float` for floats, `_double` for doubles.
+- NULL values are ignored during aggregation.
+
+---
+
+### kll_merge_agg_*
+
+Aggregates multiple KLL sketches of the same type by merging them together. 
This is useful for combining sketches created in separate aggregations (e.g., 
from different partitions or time windows). These are aggregate functions.
+
+**Syntax:**
+```sql
+kll_merge_agg_bigint(sketch [, k])
+kll_merge_agg_float(sketch [, k])
+kll_merge_agg_double(sketch [, k])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch in binary format (e.g., from 
`kll_sketch_agg_*`) |
+| `k` | INT (optional) | Controls accuracy and size of the merged sketch. 
Range: 8-65535. If not specified, the merged sketch adopts the k value from the 
first input sketch. |
+
+Returns a BINARY containing the merged KLL sketch.
+
+**Examples:**
+```sql
+-- Merge sketches from different partitions
+SELECT kll_sketch_get_quantile_bigint(
+  kll_merge_agg_bigint(sketch),
+  0.5
+)
+FROM (
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (1), (2), (3) tab(col)
+  UNION ALL
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (4), (5), (6) tab(col)
+);
+-- Result: 3
+
+-- Get the total count from merged sketches
+SELECT kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch))
+FROM (
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (1), (2), (3) tab(col)
+  UNION ALL
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (4), (5), (6) tab(col)
+);
+-- Result: 6
+```
+
+**Notes:**
+- When `k` is not specified, the merged sketch adopts the k value from the 
first input sketch.
+- The merge operation can handle input sketches with different k values.
+- NULL values are ignored during aggregation.
+- Use this function when you need to merge multiple sketches in an aggregation 
context. For merging exactly two sketches, use the scalar `kll_sketch_merge_*` 
functions instead.
+
+---
+
+### kll_sketch_to_string_*
+
+Returns a human-readable summary of the sketch.
+
+**Syntax:**
+```sql
+kll_sketch_to_string_bigint(sketch)
+kll_sketch_to_string_float(sketch)
+kll_sketch_to_string_double(sketch)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch of the corresponding type |
+
+Returns a STRING containing a human-readable summary including sketch 
parameters and statistics.
+
+---
+
+### kll_sketch_get_n_*
+
+Returns the number of items collected in the sketch.
+
+**Syntax:**
+```sql
+kll_sketch_get_n_bigint(sketch)
+kll_sketch_get_n_float(sketch)
+kll_sketch_get_n_double(sketch)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch of the corresponding type |
+
+Returns a BIGINT representing the count of items in the sketch.
+
+**Examples:**
+```sql
+SELECT kll_sketch_get_n_bigint(kll_sketch_agg_bigint(col))
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: 7
+```
+
+---
+
+### kll_sketch_merge_*
+
+Merges two KLL sketches of the same type. These are scalar functions.
+
+**Syntax:**
+```sql
+kll_sketch_merge_bigint(left, right)
+kll_sketch_merge_float(left, right)
+kll_sketch_merge_double(left, right)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `left` | BINARY | First KLL sketch |
+| `right` | BINARY | Second KLL sketch (must be same type as left) |
+
+Returns a BINARY containing the merged KLL sketch.
+
+**Examples:**
+```sql
+-- Merge two sketches from different data partitions
+SELECT kll_sketch_get_quantile_bigint(
+  kll_sketch_merge_bigint(
+    kll_sketch_agg_bigint(col1),
+    kll_sketch_agg_bigint(col2)), 0.5)
+FROM VALUES (1, 6), (2, 7), (3, 8), (4, 9), (5, 10) tab(col1, col2);
+-- Result: approximately 5 (median of 1-10)
+```
+
+**Errors:**
+- Throws an error if sketches are of incompatible types or formats.
+
+**Notes:**
+- The merge operation can handle input sketches with different k values.
+- Use this function when you need to merge exactly two sketches in an scalar 
context. For merging multiple sketches in an aggregation context, use the 
aggregate `kll_merge_agg_*` functions instead.
+
+---
+
+### kll_sketch_get_quantile_*
+
+Gets the approximate value at a given quantile rank.
+
+**Syntax:**
+```sql
+kll_sketch_get_quantile_bigint(sketch, rank)
+kll_sketch_get_quantile_float(sketch, rank)
+kll_sketch_get_quantile_double(sketch, rank)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch of the corresponding type |
+| `rank` | DOUBLE or ARRAY&lt;DOUBLE&gt; | Quantile rank(s) between 0.0 and 
1.0. Use 0.5 for median, 0.95 for 95th percentile, etc. |
+
+Returns the approximate value at the given quantile:
+- If `rank` is a scalar: Returns the corresponding type (BIGINT, FLOAT, or 
DOUBLE)
+- If `rank` is an array: Returns ARRAY of the corresponding type
+
+**Examples:**
+```sql
+-- Get the median
+SELECT kll_sketch_get_quantile_bigint(kll_sketch_agg_bigint(col), 0.5)
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: 4
+
+-- Get multiple percentiles at once
+SELECT kll_sketch_get_quantile_bigint(
+  kll_sketch_agg_bigint(col),
+  ARRAY(0.25, 0.5, 0.75, 0.95))
+FROM VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10) tab(col);
+-- Result: Array of values at 25th, 50th, 75th, and 95th percentiles
+```
+
+**Errors:**
+- Throws an error if rank values are outside [0.0, 1.0].
+- Returns NULL if the input sketch is NULL.
+
+---
+
+### kll_sketch_get_rank_*
+
+Gets the normalized rank (0.0 to 1.0) of a given value in the sketch's 
distribution.
+
+**Syntax:**
+```sql
+kll_sketch_get_rank_bigint(sketch, value)
+kll_sketch_get_rank_float(sketch, value)
+kll_sketch_get_rank_double(sketch, value)
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch of the corresponding type |
+| `value` | Corresponding type (BIGINT, FLOAT, or DOUBLE) | The value to find 
the rank for |
+
+Returns a DOUBLE representing the normalized rank between 0.0 and 1.0.
+
+**Examples:**
+```sql
+-- Find what percentile the value 3 is at
+SELECT kll_sketch_get_rank_bigint(kll_sketch_agg_bigint(col), 3)
+FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
+-- Result: approximately 0.43 (3 is around the 43rd percentile)
+```
+
+---
+
+## Approximate Top-K Functions
+
+Top-K functions estimate the most frequent items (heavy hitters) in a dataset 
using the DataSketches Frequent Items sketch.
+
+See the [Apache DataSketches Frequency 
documentation](https://datasketches.apache.org/docs/Frequency/FrequencySketches.html)
 for more information.
+
+### approx_top_k_accumulate
+
+Creates a sketch that can be stored and later combined or estimated. Useful 
for pre-aggregating data.
+
+**Syntax:**
+```sql
+approx_top_k_accumulate(expr [, maxItemsTracked])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `expr` | Same as `approx_top_k` | The column to accumulate |
+| `maxItemsTracked` | INT (optional) | Maximum items tracked. Range: 1 to 
1,000,000. Default: 10,000. |
+
+Returns a STRUCT containing a sketch state that can be passed to 
`approx_top_k_combine` or `approx_top_k_estimate`.
+
+**Examples:**
+```sql
+-- Accumulate then estimate
+SELECT approx_top_k_estimate(approx_top_k_accumulate(expr))
+FROM VALUES (0), (0), (1), (1), (2), (3), (4), (4) tab(expr);
+-- Result: 
[{"item":0,"count":2},{"item":4,"count":2},{"item":1,"count":2},{"item":2,"count":1},{"item":3,"count":1}]
+```
+
+---
+
+### approx_top_k_combine
+
+Combines multiple sketches into a single sketch.
+
+**Syntax:**
+```sql
+approx_top_k_combine(state [, maxItemsTracked])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `state` | STRUCT | A sketch state from `approx_top_k_accumulate` or 
`approx_top_k_combine` |
+| `maxItemsTracked` | INT (optional) | If specified, sets the combined sketch 
size. If not specified, all input sketches must have the same maxItemsTracked. |
+
+Returns a STRUCT containing the combined sketch state.
+
+**Examples:**
+```sql
+-- Combine sketches from different partitions
+SELECT approx_top_k_estimate(approx_top_k_combine(sketch, 10000), 5)
+FROM (
+  SELECT approx_top_k_accumulate(expr) AS sketch
+  FROM VALUES (0), (0), (1), (1) tab(expr)
+  UNION ALL
+  SELECT approx_top_k_accumulate(expr) AS sketch
+  FROM VALUES (2), (3), (4), (4) tab(expr)
+);
+-- Result: 
[{"item":0,"count":2},{"item":4,"count":2},{"item":1,"count":2},{"item":2,"count":1},{"item":3,"count":1}]
+```
+
+**Errors:**
+- Throws an error if input sketches have different `maxItemsTracked` values 
and no explicit value is provided.
+- Throws an error if input sketches have different item data types.
+
+---
+
+### approx_top_k_estimate
+
+Extracts the top K items from a sketch.
+
+**Syntax:**
+```sql
+approx_top_k_estimate(state [, k])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `state` | STRUCT | A sketch state from `approx_top_k_accumulate` or 
`approx_top_k_combine` |
+| `k` | INT (optional) | Number of top items to return. Default: 5. |
+
+Returns an ARRAY&lt;STRUCT&lt;item, count&gt;&gt; containing the frequent 
items sorted by count descending.
+
+**Examples:**
+```sql
+SELECT approx_top_k_estimate(approx_top_k_accumulate(expr), 2)
+FROM VALUES 'a', 'b', 'c', 'c', 'c', 'c', 'd', 'd' tab(expr);
+-- Result: [{"item":"c","count":4},{"item":"d","count":2}]
+```
+
+---
+
+## Best Practices
+
+### Choosing Between HLL and Theta Sketches
+
+| Use Case | Recommended Sketch |
+|----------|-------------------|
+| Simple count distinct | HLL (more memory efficient) |
+| Set operations (union, intersection, difference) | Theta |
+| Very high cardinality with moderate accuracy | HLL with higher lgConfigK |
+| Need to compute overlaps between datasets | Theta |
+
+### Accuracy vs. Memory Trade-offs
+
+| Sketch Type | Parameter | Effect of Increasing |
+|-------------|-----------|---------------------|
+| HLL | lgConfigK | Higher accuracy, more memory (2^lgConfigK bytes) |
+| Theta | lgNomEntries | Higher accuracy, more memory (8 * 2^lgNomEntries 
bytes) |
+| KLL | k | Higher accuracy, more memory |
+| Top-K | maxItemsTracked | Better heavy-hitter detection, more memory |
+
+### Storing and Reusing Sketches
+
+Sketches can be stored in BINARY columns and later merged:
+
+```sql
+-- Create a table to store daily sketches
+CREATE TABLE daily_user_sketches (
+  date DATE,
+  user_sketch BINARY
+);
+
+-- Insert daily sketches
+INSERT INTO daily_user_sketches
+SELECT current_date(), hll_sketch_agg(user_id)
+FROM events;
+
+-- Compute weekly unique users by merging daily sketches
+SELECT hll_sketch_estimate(hll_union_agg(user_sketch))
+FROM daily_user_sketches
+WHERE date BETWEEN '2024-01-01' AND '2024-01-07';
+```
+
+---
+
+## Common Use Cases and Examples
+
+Sketches are particularly valuable for periodic ETL jobs where you need to 
maintain running statistics across multiple batches of data. The general 
workflow is:
+
+1. **Aggregate** input values into a sketch using an aggregate function
+2. **Store** the sketch (as BINARY) in a table
+3. **Merge** new sketches with previously stored sketches
+4. **Query** the final sketch to get approximate answers
+
+### Example: Tracking Daily Unique Users with HLL Sketches
+
+This example shows how to maintain a running count of unique users across 
daily batches.
+
+```sql
+-- Create a table to store daily HLL sketches
+CREATE TABLE daily_user_sketches (
+  event_date DATE,
+  user_sketch BINARY
+) USING PARQUET;
+
+-- Day 1: Process first batch of events and store the sketch
+INSERT INTO daily_user_sketches
+SELECT 
+  DATE'2024-01-01' as event_date,
+  hll_sketch_agg(user_id) as user_sketch
+FROM day1_events;
+
+-- Day 2: Process second batch and store its sketch
+INSERT INTO daily_user_sketches
+SELECT 
+  DATE'2024-01-02' as event_date,
+  hll_sketch_agg(user_id) as user_sketch
+FROM day2_events;
+
+-- Query: Get unique users for a single day
+SELECT 
+  event_date,
+  hll_sketch_estimate(user_sketch) as unique_users
+FROM daily_user_sketches
+WHERE event_date = DATE'2024-01-01';
+
+-- Query: Get unique users across a date range (merging sketches)
+SELECT hll_sketch_estimate(hll_union_agg(user_sketch)) as unique_users_in_week
+FROM daily_user_sketches
+WHERE event_date BETWEEN DATE'2024-01-01' AND DATE'2024-01-07';
+```
+
+### Example: Computing Percentiles Over Time with KLL Sketches
+
+This example shows how to track response time percentiles across hourly 
batches.
+
+```sql
+-- Create a table to store hourly KLL sketches for response times
+CREATE TABLE hourly_latency_sketches (
+  hour_ts TIMESTAMP,
+  latency_sketch BINARY
+) USING PARQUET;
+
+-- Process each hour's data and store the sketch
+INSERT INTO hourly_latency_sketches
+SELECT 
+  DATE_TRUNC('hour', event_time) as hour_ts,
+  kll_sketch_agg_bigint(response_time_ms) as latency_sketch
+FROM hourly_events
+GROUP BY DATE_TRUNC('hour', event_time);
+
+-- Query: Get p50, p95, p99 for a specific hour
+SELECT 
+  hour_ts,
+  kll_sketch_get_quantile_bigint(latency_sketch, 0.5) as p50_ms,
+  kll_sketch_get_quantile_bigint(latency_sketch, 0.95) as p95_ms,
+  kll_sketch_get_quantile_bigint(latency_sketch, 0.99) as p99_ms
+FROM hourly_latency_sketches
+WHERE hour_ts = TIMESTAMP'2024-01-15 14:00:00';
+
+-- Query: Get percentiles across a full day by merging hourly sketches
+WITH daily_sketch AS (
+  SELECT kll_merge_agg_bigint(latency_sketch) as merged_sketch
+  FROM hourly_latency_sketches
+  WHERE DATE(hour_ts) = DATE'2024-01-15'
+)
+SELECT
+  kll_sketch_get_quantile_bigint(merged_sketch, 0.5) as p50_ms,
+  kll_sketch_get_quantile_bigint(merged_sketch, 0.95) as p95_ms,
+  kll_sketch_get_quantile_bigint(merged_sketch, 0.99) as p99_ms
+FROM daily_sketch;
+```
+
+### Example: Set Operations with Theta Sketches
+
+Theta sketches support set operations, making them useful for analyzing 
overlapping populations.
+
+```sql
+-- Create sketches for users who performed different actions
+CREATE TABLE action_sketches (
+  action_type STRING,
+  user_sketch BINARY
+) USING PARQUET;
+
+-- Store sketches for each action type
+INSERT INTO action_sketches
+SELECT 'purchase', theta_sketch_agg(user_id) FROM purchases;
+
+INSERT INTO action_sketches
+SELECT 'add_to_cart', theta_sketch_agg(user_id) FROM cart_additions;
+
+INSERT INTO action_sketches
+SELECT 'page_view', theta_sketch_agg(user_id) FROM page_views;
+
+-- Query: How many users purchased?
+SELECT theta_sketch_estimate(user_sketch) as purchasers
+FROM action_sketches WHERE action_type = 'purchase';
+
+-- Query: How many users added to cart but did NOT purchase?
+SELECT theta_sketch_estimate(
+  theta_difference(
+    (SELECT user_sketch FROM action_sketches WHERE action_type = 
'add_to_cart'),
+    (SELECT user_sketch FROM action_sketches WHERE action_type = 'purchase')
+  )
+) as cart_abandoners;
+
+-- Query: How many users both viewed pages AND purchased (intersection)?
+SELECT theta_sketch_estimate(
+  theta_intersection(
+    (SELECT user_sketch FROM action_sketches WHERE action_type = 'page_view'),
+    (SELECT user_sketch FROM action_sketches WHERE action_type = 'purchase')
+  )
+) as engaged_purchasers;
+```
+
+### Example: Finding Trending Items with Top-K Sketches
+
+Track the most frequently occurring items across batches.
+
+```sql
+-- Create a table to store hourly top-k sketches
+CREATE TABLE hourly_search_sketches (
+  hour_ts TIMESTAMP,
+  search_sketch STRUCT<sketch: BINARY, maxItemsTracked: INT, itemDataType: 
STRING, itemDataTypeDDL: STRING>
+) USING PARQUET;
+
+-- Process each hour's search queries
+INSERT INTO hourly_search_sketches
+SELECT 
+  DATE_TRUNC('hour', search_time) as hour_ts,
+  approx_top_k_accumulate(search_term, 10000) as search_sketch
+FROM search_logs
+GROUP BY DATE_TRUNC('hour', search_time);
+
+-- Query: Get top 10 searches for a specific hour
+SELECT approx_top_k_estimate(search_sketch, 10) as top_searches
+FROM hourly_search_sketches
+WHERE hour_ts = TIMESTAMP'2024-01-15 14:00:00';
+
+-- Query: Get top 10 searches across the full day by combining sketches
+SELECT approx_top_k_estimate(
+  approx_top_k_combine(search_sketch, 10000), 
+  10
+) as daily_top_searches
+FROM hourly_search_sketches
+WHERE DATE(hour_ts) = DATE'2024-01-15';
+```
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index 5fed5c8d6719..9fcdac38e7d5 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -461,6 +461,9 @@ Aggregate Functions
     kll_sketch_agg_bigint
     kll_sketch_agg_double
     kll_sketch_agg_float
+    kll_merge_agg_bigint
+    kll_merge_agg_float
+    kll_merge_agg_double
     kurtosis
     last
     last_value
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index 69706398253c..a2db7e172b5d 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -4571,6 +4571,48 @@ def kll_sketch_agg_double(
 kll_sketch_agg_double.__doc__ = pysparkfuncs.kll_sketch_agg_double.__doc__
 
 
+def kll_merge_agg_bigint(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    fn = "kll_merge_agg_bigint"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_merge_agg_bigint.__doc__ = pysparkfuncs.kll_merge_agg_bigint.__doc__
+
+
+def kll_merge_agg_float(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    fn = "kll_merge_agg_float"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_merge_agg_float.__doc__ = pysparkfuncs.kll_merge_agg_float.__doc__
+
+
+def kll_merge_agg_double(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    fn = "kll_merge_agg_double"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
+kll_merge_agg_double.__doc__ = pysparkfuncs.kll_merge_agg_double.__doc__
+
+
 def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
     fn = "kll_sketch_to_string_bigint"
     return _invoke_function_over_columns(fn, col)
diff --git a/python/pyspark/sql/functions/__init__.py 
b/python/pyspark/sql/functions/__init__.py
index 6bbc69dc9bf6..64446a835d84 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -375,6 +375,9 @@ __all__ = [  # noqa: F405
     "kll_sketch_agg_bigint",
     "kll_sketch_agg_double",
     "kll_sketch_agg_float",
+    "kll_merge_agg_bigint",
+    "kll_merge_agg_double",
+    "kll_merge_agg_float",
     "kurtosis",
     "last",
     "last_value",
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 5bb1b2d8b5ef..63b4ad64b579 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -26694,6 +26694,138 @@ def kll_sketch_agg_double(
         return _invoke_function_over_columns(fn, col, lit(k))
 
 
+@_try_remote_functions
+def kll_merge_agg_bigint(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: merges binary KllLongsSketch representations and 
returns the
+    merged sketch. The optional k parameter controls the size and accuracy of 
the merged
+    sketch (range 8-65535). If k is not specified, the merged sketch adopts 
the k value
+    from the first input sketch.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        The column containing binary KllLongsSketch representations
+    k : :class:`~pyspark.sql.Column` or int, optional
+        The k parameter that controls size and accuracy (range 8-65535)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The merged binary representation of the KllLongsSketch.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df1 = spark.createDataFrame([1,2,3], "INT")
+    >>> df2 = spark.createDataFrame([4,5,6], "INT")
+    >>> sketch1 = df1.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+    >>> sketch2 = df2.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
+    >>> merged = 
sketch1.union(sketch2).agg(sf.kll_merge_agg_bigint("sketch").alias("merged"))
+    >>> n = merged.select(sf.kll_sketch_get_n_bigint("merged")).first()[0]
+    >>> n
+    6
+    """
+    fn = "kll_merge_agg_bigint"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
+@_try_remote_functions
+def kll_merge_agg_float(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: merges binary KllFloatsSketch representations and 
returns the
+    merged sketch. The optional k parameter controls the size and accuracy of 
the merged
+    sketch (range 8-65535). If k is not specified, the merged sketch adopts 
the k value
+    from the first input sketch.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        The column containing binary KllFloatsSketch representations
+    k : :class:`~pyspark.sql.Column` or int, optional
+        The k parameter that controls size and accuracy (range 8-65535)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The merged binary representation of the KllFloatsSketch.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df1 = spark.createDataFrame([1.0,2.0,3.0], "FLOAT")
+    >>> df2 = spark.createDataFrame([4.0,5.0,6.0], "FLOAT")
+    >>> sketch1 = df1.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+    >>> sketch2 = df2.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
+    >>> merged = 
sketch1.union(sketch2).agg(sf.kll_merge_agg_float("sketch").alias("merged"))
+    >>> n = merged.select(sf.kll_sketch_get_n_float("merged")).first()[0]
+    >>> n
+    6
+    """
+    fn = "kll_merge_agg_float"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
+@_try_remote_functions
+def kll_merge_agg_double(
+    col: "ColumnOrName",
+    k: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: merges binary KllDoublesSketch representations and 
returns the
+    merged sketch. The optional k parameter controls the size and accuracy of 
the merged
+    sketch (range 8-65535). If k is not specified, the merged sketch adopts 
the k value
+    from the first input sketch.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        The column containing binary KllDoublesSketch representations
+    k : :class:`~pyspark.sql.Column` or int, optional
+        The k parameter that controls size and accuracy (range 8-65535)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The merged binary representation of the KllDoublesSketch.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df1 = spark.createDataFrame([1.0,2.0,3.0], "DOUBLE")
+    >>> df2 = spark.createDataFrame([4.0,5.0,6.0], "DOUBLE")
+    >>> sketch1 = df1.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+    >>> sketch2 = df2.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
+    >>> merged = 
sketch1.union(sketch2).agg(sf.kll_merge_agg_double("sketch").alias("merged"))
+    >>> n = merged.select(sf.kll_sketch_get_n_double("merged")).first()[0]
+    >>> n
+    6
+    """
+    fn = "kll_merge_agg_double"
+    if k is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(k))
+
+
 @_try_remote_functions
 def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
     """
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index b6d4e3a0547b..a776fa6e80b7 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -2270,6 +2270,97 @@ class FunctionsTestsMixin:
         # Should only count non-null values
         self.assertEqual(n, 3)
 
+    def test_kll_merge_agg_bigint(self):
+        """Test kll_merge_agg_bigint function"""
+        df1 = self.spark.createDataFrame([1, 2, 3], "INT")
+        df2 = self.spark.createDataFrame([4, 5, 6], "INT")
+
+        sketch1 = df1.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+        sketch2 = df2.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+        # Union and merge sketches
+        merged = 
sketch1.union(sketch2).agg(F.kll_merge_agg_bigint("sketch").alias("merged"))
+
+        # Verify the merged sketch contains all values
+        n = merged.select(F.kll_sketch_get_n_bigint("merged")).first()[0]
+        self.assertEqual(n, 6)
+
+        # Test with explicit k parameter
+        merged_with_k = sketch1.union(sketch2).agg(
+            F.kll_merge_agg_bigint("sketch", 400).alias("merged")
+        )
+        self.assertIsNotNone(merged_with_k.first()[0])
+
+    def test_kll_merge_agg_float(self):
+        """Test kll_merge_agg_float function"""
+        df1 = self.spark.createDataFrame([1.0, 2.0, 3.0], "FLOAT")
+        df2 = self.spark.createDataFrame([4.0, 5.0, 6.0], "FLOAT")
+
+        sketch1 = df1.agg(F.kll_sketch_agg_float("value").alias("sketch"))
+        sketch2 = df2.agg(F.kll_sketch_agg_float("value").alias("sketch"))
+
+        # Union and merge sketches
+        merged = 
sketch1.union(sketch2).agg(F.kll_merge_agg_float("sketch").alias("merged"))
+
+        # Verify the merged sketch contains all values
+        n = merged.select(F.kll_sketch_get_n_float("merged")).first()[0]
+        self.assertEqual(n, 6)
+
+        # Test with explicit k parameter
+        merged_with_k = sketch1.union(sketch2).agg(
+            F.kll_merge_agg_float("sketch", 300).alias("merged")
+        )
+        self.assertIsNotNone(merged_with_k.first()[0])
+
+    def test_kll_merge_agg_double(self):
+        """Test kll_merge_agg_double function"""
+        df1 = self.spark.createDataFrame([1.0, 2.0, 3.0], "DOUBLE")
+        df2 = self.spark.createDataFrame([4.0, 5.0, 6.0], "DOUBLE")
+
+        sketch1 = df1.agg(F.kll_sketch_agg_double("value").alias("sketch"))
+        sketch2 = df2.agg(F.kll_sketch_agg_double("value").alias("sketch"))
+
+        # Union and merge sketches
+        merged = 
sketch1.union(sketch2).agg(F.kll_merge_agg_double("sketch").alias("merged"))
+
+        # Verify the merged sketch contains all values
+        n = merged.select(F.kll_sketch_get_n_double("merged")).first()[0]
+        self.assertEqual(n, 6)
+
+        # Test quantile on merged sketch
+        quantile = merged.select(F.kll_sketch_get_quantile_double("merged", 
F.lit(0.5))).first()[0]
+        self.assertIsNotNone(quantile)
+
+    def test_kll_merge_agg_with_different_k(self):
+        """Test kll_merge_agg with different k values"""
+        df1 = self.spark.createDataFrame([1, 2, 3], "INT")
+        df2 = self.spark.createDataFrame([4, 5, 6], "INT")
+
+        # Create sketches with different k values
+        sketch1 = df1.agg(F.kll_sketch_agg_bigint("value", 
200).alias("sketch"))
+        sketch2 = df2.agg(F.kll_sketch_agg_bigint("value", 
400).alias("sketch"))
+
+        # Merge sketches with different k values (should adopt from first 
sketch)
+        merged = 
sketch1.union(sketch2).agg(F.kll_merge_agg_bigint("sketch").alias("merged"))
+
+        n = merged.select(F.kll_sketch_get_n_bigint("merged")).first()[0]
+        self.assertEqual(n, 6)
+
+    def test_kll_merge_agg_with_nulls(self):
+        """Test kll_merge_agg with null values"""
+        df1 = self.spark.createDataFrame([1, 2, 3], "INT")
+        df2 = self.spark.createDataFrame([4, None, 6], "INT")
+
+        sketch1 = df1.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+        sketch2 = df2.agg(F.kll_sketch_agg_bigint("value").alias("sketch"))
+
+        # Merge sketches - null values should be ignored
+        merged = 
sketch1.union(sketch2).agg(F.kll_merge_agg_bigint("sketch").alias("merged"))
+
+        n = merged.select(F.kll_sketch_get_n_bigint("merged")).first()[0]
+        # Should have 5 values (1,2,3,4,6 - null is ignored)
+        self.assertEqual(n, 5)
+
     def test_datetime_functions(self):
         df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
         parse_result = df.select(F.to_date(F.col("dateCol"))).first()
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 0a14491de223..d5d648107390 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1463,145 +1463,168 @@ object functions {
     kll_sketch_agg_double(Column(columnName))
 
   /**
-   * Returns a string with human readable summary information about the KLL 
bigint sketch.
+   * Aggregate function: merges binary KllLongsSketch representations and 
returns the merged
+   * sketch. The optional k parameter controls the size and accuracy of the 
merged sketch (range
+   * 8-65535). If k is not specified, the merged sketch adopts the k value 
from the first input
+   * sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_to_string_bigint(e: Column): Column =
-    Column.fn("kll_sketch_to_string_bigint", e)
+  def kll_merge_agg_bigint(e: Column, k: Column): Column =
+    Column.fn("kll_merge_agg_bigint", e, k)
 
   /**
-   * Returns a string with human readable summary information about the KLL 
float sketch.
+   * Aggregate function: merges binary KllLongsSketch representations and 
returns the merged
+   * sketch. The optional k parameter controls the size and accuracy of the 
merged sketch (range
+   * 8-65535). If k is not specified, the merged sketch adopts the k value 
from the first input
+   * sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_to_string_float(e: Column): Column =
-    Column.fn("kll_sketch_to_string_float", e)
+  def kll_merge_agg_bigint(e: Column, k: Int): Column =
+    Column.fn("kll_merge_agg_bigint", e, lit(k))
 
   /**
-   * Returns a string with human readable summary information about the KLL 
double sketch.
+   * Aggregate function: merges binary KllLongsSketch representations and 
returns the merged
+   * sketch. The optional k parameter controls the size and accuracy of the 
merged sketch (range
+   * 8-65535). If k is not specified, the merged sketch adopts the k value 
from the first input
+   * sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_to_string_double(e: Column): Column =
-    Column.fn("kll_sketch_to_string_double", e)
+  def kll_merge_agg_bigint(columnName: String, k: Int): Column =
+    kll_merge_agg_bigint(Column(columnName), k)
 
   /**
-   * Returns the number of items collected in the KLL bigint sketch.
+   * Aggregate function: merges binary KllLongsSketch representations and 
returns the merged
+   * sketch. If k is not specified, the merged sketch adopts the k value from 
the first input
+   * sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_n_bigint(e: Column): Column =
-    Column.fn("kll_sketch_get_n_bigint", e)
+  def kll_merge_agg_bigint(e: Column): Column =
+    Column.fn("kll_merge_agg_bigint", e)
 
   /**
-   * Returns the number of items collected in the KLL float sketch.
+   * Aggregate function: merges binary KllLongsSketch representations and 
returns the merged
+   * sketch. If k is not specified, the merged sketch adopts the k value from 
the first input
+   * sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_n_float(e: Column): Column =
-    Column.fn("kll_sketch_get_n_float", e)
+  def kll_merge_agg_bigint(columnName: String): Column =
+    kll_merge_agg_bigint(Column(columnName))
 
   /**
-   * Returns the number of items collected in the KLL double sketch.
+   * Aggregate function: merges binary KllFloatsSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_n_double(e: Column): Column =
-    Column.fn("kll_sketch_get_n_double", e)
+  def kll_merge_agg_float(e: Column, k: Column): Column =
+    Column.fn("kll_merge_agg_float", e, k)
 
   /**
-   * Merges two KLL bigint sketch buffers together into one.
+   * Aggregate function: merges binary KllFloatsSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_merge_bigint(left: Column, right: Column): Column =
-    Column.fn("kll_sketch_merge_bigint", left, right)
+  def kll_merge_agg_float(e: Column, k: Int): Column =
+    Column.fn("kll_merge_agg_float", e, lit(k))
 
   /**
-   * Merges two KLL float sketch buffers together into one.
+   * Aggregate function: merges binary KllFloatsSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_merge_float(left: Column, right: Column): Column =
-    Column.fn("kll_sketch_merge_float", left, right)
+  def kll_merge_agg_float(columnName: String, k: Int): Column =
+    kll_merge_agg_float(Column(columnName), k)
 
   /**
-   * Merges two KLL double sketch buffers together into one.
+   * Aggregate function: merges binary KllFloatsSketch representations and 
returns merged sketch.
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_merge_double(left: Column, right: Column): Column =
-    Column.fn("kll_sketch_merge_double", left, right)
+  def kll_merge_agg_float(e: Column): Column =
+    Column.fn("kll_merge_agg_float", e)
 
   /**
-   * Extracts a quantile value from a KLL bigint sketch given an input rank 
value. The rank can be
-   * a single value or an array.
+   * Aggregate function: merges binary KllFloatsSketch representations and 
returns merged sketch.
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_quantile_bigint(sketch: Column, rank: Column): Column =
-    Column.fn("kll_sketch_get_quantile_bigint", sketch, rank)
+  def kll_merge_agg_float(columnName: String): Column =
+    kll_merge_agg_float(Column(columnName))
 
   /**
-   * Extracts a quantile value from a KLL float sketch given an input rank 
value. The rank can be
-   * a single value or an array.
+   * Aggregate function: merges binary KllDoublesSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_quantile_float(sketch: Column, rank: Column): Column =
-    Column.fn("kll_sketch_get_quantile_float", sketch, rank)
+  def kll_merge_agg_double(e: Column, k: Column): Column =
+    Column.fn("kll_merge_agg_double", e, k)
 
   /**
-   * Extracts a quantile value from a KLL double sketch given an input rank 
value. The rank can be
-   * a single value or an array.
+   * Aggregate function: merges binary KllDoublesSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_quantile_double(sketch: Column, rank: Column): Column =
-    Column.fn("kll_sketch_get_quantile_double", sketch, rank)
+  def kll_merge_agg_double(e: Column, k: Int): Column =
+    Column.fn("kll_merge_agg_double", e, lit(k))
 
   /**
-   * Extracts a rank value from a KLL bigint sketch given an input quantile 
value. The quantile
-   * can be a single value or an array.
+   * Aggregate function: merges binary KllDoublesSketch representations and 
returns merged sketch.
+   * The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_rank_bigint(sketch: Column, quantile: Column): Column =
-    Column.fn("kll_sketch_get_rank_bigint", sketch, quantile)
+  def kll_merge_agg_double(columnName: String, k: Int): Column =
+    kll_merge_agg_double(Column(columnName), k)
 
   /**
-   * Extracts a rank value from a KLL float sketch given an input quantile 
value. The quantile can
-   * be a single value or an array.
+   * Aggregate function: merges binary KllDoublesSketch representations and 
returns merged sketch.
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_rank_float(sketch: Column, quantile: Column): Column =
-    Column.fn("kll_sketch_get_rank_float", sketch, quantile)
+  def kll_merge_agg_double(e: Column): Column =
+    Column.fn("kll_merge_agg_double", e)
 
   /**
-   * Extracts a rank value from a KLL double sketch given an input quantile 
value. The quantile
-   * can be a single value or an array.
+   * Aggregate function: merges binary KllDoublesSketch representations and 
returns merged sketch.
+   * If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
    *
-   * @group misc_funcs
+   * @group agg_funcs
    * @since 4.1.0
    */
-  def kll_sketch_get_rank_double(sketch: Column, quantile: Column): Column =
-    Column.fn("kll_sketch_get_rank_double", sketch, quantile)
+  def kll_merge_agg_double(columnName: String): Column =
+    kll_merge_agg_double(Column(columnName))
 
   /**
    * Aggregate function: returns the concatenation of non-null input values.
@@ -4109,6 +4132,147 @@ object functions {
   def theta_union(c1: Column, c2: Column, lgNomEntries: Column): Column =
     Column.fn("theta_union", c1, c2, lgNomEntries)
 
+  /**
+   * Returns a string with human readable summary information about the KLL 
bigint sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_to_string_bigint(e: Column): Column =
+    Column.fn("kll_sketch_to_string_bigint", e)
+
+  /**
+   * Returns a string with human readable summary information about the KLL 
float sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_to_string_float(e: Column): Column =
+    Column.fn("kll_sketch_to_string_float", e)
+
+  /**
+   * Returns a string with human readable summary information about the KLL 
double sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_to_string_double(e: Column): Column =
+    Column.fn("kll_sketch_to_string_double", e)
+
+  /**
+   * Returns the number of items collected in the KLL bigint sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_n_bigint(e: Column): Column =
+    Column.fn("kll_sketch_get_n_bigint", e)
+
+  /**
+   * Returns the number of items collected in the KLL float sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_n_float(e: Column): Column =
+    Column.fn("kll_sketch_get_n_float", e)
+
+  /**
+   * Returns the number of items collected in the KLL double sketch.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_n_double(e: Column): Column =
+    Column.fn("kll_sketch_get_n_double", e)
+
+  /**
+   * Merges two KLL bigint sketch buffers together into one.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_merge_bigint(left: Column, right: Column): Column =
+    Column.fn("kll_sketch_merge_bigint", left, right)
+
+  /**
+   * Merges two KLL float sketch buffers together into one.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_merge_float(left: Column, right: Column): Column =
+    Column.fn("kll_sketch_merge_float", left, right)
+
+  /**
+   * Merges two KLL double sketch buffers together into one.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_merge_double(left: Column, right: Column): Column =
+    Column.fn("kll_sketch_merge_double", left, right)
+
+  /**
+   * Extracts a quantile value from a KLL bigint sketch given an input rank 
value. The rank can be
+   * a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_quantile_bigint(sketch: Column, rank: Column): Column =
+    Column.fn("kll_sketch_get_quantile_bigint", sketch, rank)
+
+  /**
+   * Extracts a quantile value from a KLL float sketch given an input rank 
value. The rank can be
+   * a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_quantile_float(sketch: Column, rank: Column): Column =
+    Column.fn("kll_sketch_get_quantile_float", sketch, rank)
+
+  /**
+   * Extracts a quantile value from a KLL double sketch given an input rank 
value. The rank can be
+   * a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_quantile_double(sketch: Column, rank: Column): Column =
+    Column.fn("kll_sketch_get_quantile_double", sketch, rank)
+
+  /**
+   * Extracts a rank value from a KLL bigint sketch given an input quantile 
value. The quantile
+   * can be a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_rank_bigint(sketch: Column, quantile: Column): Column =
+    Column.fn("kll_sketch_get_rank_bigint", sketch, quantile)
+
+  /**
+   * Extracts a rank value from a KLL float sketch given an input quantile 
value. The quantile can
+   * be a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_rank_float(sketch: Column, quantile: Column): Column =
+    Column.fn("kll_sketch_get_rank_float", sketch, quantile)
+
+  /**
+   * Extracts a rank value from a KLL double sketch given an input quantile 
value. The quantile
+   * can be a single value or an array.
+   *
+   * @group misc_funcs
+   * @since 4.1.0
+   */
+  def kll_sketch_get_rank_double(sketch: Column, quantile: Column): Column =
+    Column.fn("kll_sketch_get_rank_double", sketch, quantile)
+
   /**
    * Returns the user name of current execution context.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 36e40306be7d..e1ca5ad91847 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -539,21 +539,9 @@ object FunctionRegistry {
     expression[KllSketchAggBigint]("kll_sketch_agg_bigint"),
     expression[KllSketchAggFloat]("kll_sketch_agg_float"),
     expression[KllSketchAggDouble]("kll_sketch_agg_double"),
-    expression[KllSketchToStringBigint]("kll_sketch_to_string_bigint"),
-    expression[KllSketchToStringFloat]("kll_sketch_to_string_float"),
-    expression[KllSketchToStringDouble]("kll_sketch_to_string_double"),
-    expression[KllSketchGetNBigint]("kll_sketch_get_n_bigint"),
-    expression[KllSketchGetNFloat]("kll_sketch_get_n_float"),
-    expression[KllSketchGetNDouble]("kll_sketch_get_n_double"),
-    expression[KllSketchMergeBigint]("kll_sketch_merge_bigint"),
-    expression[KllSketchMergeFloat]("kll_sketch_merge_float"),
-    expression[KllSketchMergeDouble]("kll_sketch_merge_double"),
-    expression[KllSketchGetQuantileBigint]("kll_sketch_get_quantile_bigint"),
-    expression[KllSketchGetQuantileFloat]("kll_sketch_get_quantile_float"),
-    expression[KllSketchGetQuantileDouble]("kll_sketch_get_quantile_double"),
-    expression[KllSketchGetRankBigint]("kll_sketch_get_rank_bigint"),
-    expression[KllSketchGetRankFloat]("kll_sketch_get_rank_float"),
-    expression[KllSketchGetRankDouble]("kll_sketch_get_rank_double"),
+    expression[KllMergeAggBigint]("kll_merge_agg_bigint"),
+    expression[KllMergeAggFloat]("kll_merge_agg_float"),
+    expression[KllMergeAggDouble]("kll_merge_agg_double"),
 
     // string functions
     expression[Ascii]("ascii"),
@@ -819,6 +807,21 @@ object FunctionRegistry {
     expression[ThetaDifference]("theta_difference"),
     expression[ThetaIntersection]("theta_intersection"),
     expression[ApproxTopKEstimate]("approx_top_k_estimate"),
+    expression[KllSketchToStringBigint]("kll_sketch_to_string_bigint"),
+    expression[KllSketchToStringFloat]("kll_sketch_to_string_float"),
+    expression[KllSketchToStringDouble]("kll_sketch_to_string_double"),
+    expression[KllSketchGetNBigint]("kll_sketch_get_n_bigint"),
+    expression[KllSketchGetNFloat]("kll_sketch_get_n_float"),
+    expression[KllSketchGetNDouble]("kll_sketch_get_n_double"),
+    expression[KllSketchMergeBigint]("kll_sketch_merge_bigint"),
+    expression[KllSketchMergeFloat]("kll_sketch_merge_float"),
+    expression[KllSketchMergeDouble]("kll_sketch_merge_double"),
+    expression[KllSketchGetQuantileBigint]("kll_sketch_get_quantile_bigint"),
+    expression[KllSketchGetQuantileFloat]("kll_sketch_get_quantile_float"),
+    expression[KllSketchGetQuantileDouble]("kll_sketch_get_quantile_double"),
+    expression[KllSketchGetRankBigint]("kll_sketch_get_rank_bigint"),
+    expression[KllSketchGetRankFloat]("kll_sketch_get_rank_float"),
+    expression[KllSketchGetRankDouble]("kll_sketch_get_rank_double"),
 
     // grouping sets
     expression[Grouping]("grouping"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
index e74b22219cf6..6e3ea19425d9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
-import org.apache.datasketches.kll.{KllDoublesSketch, KllFloatsSketch, 
KllLongsSketch}
+import org.apache.datasketches.kll.{KllDoublesSketch, KllFloatsSketch, 
KllLongsSketch, KllSketch}
 import org.apache.datasketches.memory.Memory
 
 import org.apache.spark.SparkUnsupportedOperationException
@@ -108,9 +108,10 @@ case class KllSketchAggBigint(
   override def checkInputDataTypes(): TypeCheckResult = {
     val defaultCheck = super.checkInputDataTypes()
     if (defaultCheck.isFailure) {
-      return defaultCheck
+      defaultCheck
+    } else {
+      checkKInputDataTypes()
     }
-    checkKInputDataTypes()
   }
 
   override def createAggregationBuffer(): KllLongsSketch =
@@ -123,26 +124,25 @@ case class KllSketchAggBigint(
    * Note, null values are ignored.
    */
   override def update(sketch: KllLongsSketch, input: InternalRow): 
KllLongsSketch = {
-    // Return early for null values.
     val v = child.eval(input)
     if (v == null) {
-      return sketch
-    }
-    // Handle the different data types for sketch updates.
-    child.dataType match {
-      case ByteType =>
-        sketch.update(v.asInstanceOf[Byte].toLong)
-      case IntegerType =>
-        sketch.update(v.asInstanceOf[Int].toLong)
-      case LongType =>
-        sketch.update(v.asInstanceOf[Long])
-      case ShortType =>
-        sketch.update(v.asInstanceOf[Short].toLong)
-      case _ =>
-        throw unexpectedInputDataTypeError(child)
+      sketch
+    } else {
+      // Handle the different data types for sketch updates.
+      child.dataType match {
+        case ByteType =>
+          sketch.update(v.asInstanceOf[Byte].toLong)
+        case IntegerType =>
+          sketch.update(v.asInstanceOf[Int].toLong)
+        case LongType =>
+          sketch.update(v.asInstanceOf[Long])
+        case ShortType =>
+          sketch.update(v.asInstanceOf[Short].toLong)
+        case _ =>
+          throw unexpectedInputDataTypeError(child)
+      }
+      sketch
     }
-
-    sketch
   }
 
   /** Merges an input sketch into the current aggregation buffer. */
@@ -250,9 +250,10 @@ case class KllSketchAggFloat(
   override def checkInputDataTypes(): TypeCheckResult = {
     val defaultCheck = super.checkInputDataTypes()
     if (defaultCheck.isFailure) {
-      return defaultCheck
+      defaultCheck
+    } else {
+      checkKInputDataTypes()
     }
-    checkKInputDataTypes()
   }
 
   override def createAggregationBuffer(): KllFloatsSketch =
@@ -265,20 +266,19 @@ case class KllSketchAggFloat(
    * Note, Null values are ignored.
    */
   override def update(sketch: KllFloatsSketch, input: InternalRow): 
KllFloatsSketch = {
-    // Return early for null values.
     val v = child.eval(input)
     if (v == null) {
-      return sketch
-    }
-    // Handle the different data types for sketch updates.
-    child.dataType match {
-      case FloatType =>
-        sketch.update(v.asInstanceOf[Float])
-      case _ =>
-        throw unexpectedInputDataTypeError(child)
+      sketch
+    } else {
+      // Handle the different data types for sketch updates.
+      child.dataType match {
+        case FloatType =>
+          sketch.update(v.asInstanceOf[Float])
+        case _ =>
+          throw unexpectedInputDataTypeError(child)
+      }
+      sketch
     }
-
-    sketch
   }
 
   /** Merges an input sketch into the current aggregation buffer. */
@@ -386,9 +386,10 @@ case class KllSketchAggDouble(
   override def checkInputDataTypes(): TypeCheckResult = {
     val defaultCheck = super.checkInputDataTypes()
     if (defaultCheck.isFailure) {
-      return defaultCheck
+      defaultCheck
+    } else {
+      checkKInputDataTypes()
     }
-    checkKInputDataTypes()
   }
 
   override def createAggregationBuffer(): KllDoublesSketch =
@@ -401,22 +402,21 @@ case class KllSketchAggDouble(
    * Note, Null values are ignored.
    */
   override def update(sketch: KllDoublesSketch, input: InternalRow): 
KllDoublesSketch = {
-    // Return early for null values.
     val v = child.eval(input)
     if (v == null) {
-      return sketch
-    }
-    // Handle the different data types for sketch updates.
-    child.dataType match {
-      case DoubleType =>
-        sketch.update(v.asInstanceOf[Double])
-      case FloatType =>
-        sketch.update(v.asInstanceOf[Float].toDouble)
-      case _ =>
-        throw unexpectedInputDataTypeError(child)
+      sketch
+    } else {
+      // Handle the different data types for sketch updates.
+      child.dataType match {
+        case DoubleType =>
+          sketch.update(v.asInstanceOf[Double])
+        case FloatType =>
+          sketch.update(v.asInstanceOf[Float].toDouble)
+        case _ =>
+          throw unexpectedInputDataTypeError(child)
+      }
+      sketch
     }
-
-    sketch
   }
 
   /** Merges an input sketch into the current aggregation buffer. */
@@ -449,6 +449,350 @@ case class KllSketchAggDouble(
   }
 }
 
+/**
+ * The KllMergeAggBigint function merges multiple Apache DataSketches 
KllLongsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllLongsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllLongsSketch representations to merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllLongsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_bigint).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_bigint(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_bigint(col) as sketch FROM VALUES (1), (2), (3) tab(col) UNION 
ALL SELECT kll_sketch_agg_bigint(col) as sketch FROM VALUES (4), (5), (6) 
tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggBigint(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllLongsSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggBigint =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggBigint =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggBigint = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_bigint"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllLongsSketch = 
KllLongsSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllLongsSketch =
+    KllLongsSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllLongsSketch =
+    KllLongsSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllLongsSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * The KllMergeAggFloat function merges multiple Apache DataSketches 
KllFloatsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllFloatsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllFloatsSketch representations to 
merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllFloatsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_float).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_float(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(1.0 AS FLOAT)), (CAST(2.0 
AS FLOAT)), (CAST(3.0 AS FLOAT)) tab(col) UNION ALL SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(4.0 AS FLOAT)), (CAST(5.0 
AS FLOAT)), (CAST(6.0 AS FLOAT)) tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggFloat(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllFloatsSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggFloat =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggFloat =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggFloat = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_float"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllFloatsSketch = 
KllFloatsSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllFloatsSketch =
+    KllFloatsSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllFloatsSketch =
+    KllFloatsSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllFloatsSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * The KllMergeAggDouble function merges multiple Apache DataSketches 
KllDoublesSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllDoublesSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllDoublesSketch representations to 
merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllDoublesSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_double).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_double(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(1.0 AS DOUBLE)), 
(CAST(2.0 AS DOUBLE)), (CAST(3.0 AS DOUBLE)) tab(col) UNION ALL SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(4.0 AS DOUBLE)), 
(CAST(5.0 AS DOUBLE)), (CAST(6.0 AS DOUBLE)) tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggDouble(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllDoublesSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggDouble =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggDouble =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggDouble = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_double"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllDoublesSketch = 
KllDoublesSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllDoublesSketch =
+    KllDoublesSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllDoublesSketch =
+    KllDoublesSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllDoublesSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * Base abstract class for KLL merge aggregate functions that provides common 
implementation
+ * for merging serialized KLL sketches with optional k parameter.
+ *
+ * @tparam T The KLL sketch type (KllLongsSketch, KllFloatsSketch, or 
KllDoublesSketch)
+ */
+abstract class KllMergeAggBase[T <: KllSketch]
+    extends TypedImperativeAggregate[Option[T]]
+    with KllSketchAggBase
+    with ExpectsInputTypes {
+
+  def child: Expression
+
+  // Abstract factory methods for sketch-specific instantiation
+  protected def newHeapInstance(k: Int): T
+  protected def wrapSketch(bytes: Array[Byte]): T
+  protected def heapifySketch(bytes: Array[Byte]): T
+  protected def toByteArray(sketch: T): Array[Byte]
+
+  // Common implementations for all merge aggregates
+  override def children: Seq[Expression] = child +: kExpr.toSeq
+
+  override def dataType: DataType = BinaryType
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    val baseTypes = Seq(BinaryType)
+    if (kExpr.isDefined) baseTypes :+ IntegerType else baseTypes
+  }
+
+  override def nullable: Boolean = false
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      defaultCheck
+    } else {
+      checkKInputDataTypes()
+    }
+  }
+
+  /**
+   * Defer instantiation of the sketch instance until we've deserialized
+   * our first sketch (if kExpr was not provided), and use that sketch's k 
value.
+   *
+   * @return None if kExpr was not provided, otherwise Some(sketch with 
specified k)
+   */
+  override def createAggregationBuffer(): Option[T] = {
+    if (kExpr.isDefined) {
+      Some(newHeapInstance(kValue))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Evaluate the input row and wrap the binary sketch, then merge it into
+   * the current aggregation buffer.
+   * Note, null values are ignored.
+   */
+  override def update(sketchOption: Option[T], input: InternalRow): Option[T] 
= {
+    val v = child.eval(input)
+    if (v == null) {
+      sketchOption
+    } else {
+      try {
+        val sketchBytes = v.asInstanceOf[Array[Byte]]
+        val inputSketch = wrapSketch(sketchBytes)
+        val sketch = 
sketchOption.getOrElse(newHeapInstance(inputSketch.getK()))
+        sketch.merge(inputSketch)
+        Some(sketch)
+      } catch {
+        case _: Exception =>
+          throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
+      }
+    }
+  }
+
+  /** Merges an input sketch into the current aggregation buffer. */
+  override def merge(updateBufferOption: Option[T], inputOption: Option[T]): 
Option[T] = {
+    (updateBufferOption, inputOption) match {
+      case (Some(updateBuffer), Some(input)) =>
+        try {
+          updateBuffer.merge(input)
+          Some(updateBuffer)
+        } catch {
+          case _: Exception =>
+            throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
+        }
+      case (Some(_), None) => updateBufferOption
+      case (None, Some(_)) => inputOption
+      case (None, None) => None
+    }
+  }
+
+  /** Returns a sketch derived from the input column or expression. */
+  override def eval(sketchOption: Option[T]): Any = {
+    sketchOption match {
+      case Some(sketch) => toByteArray(sketch)
+      case None => toByteArray(newHeapInstance(kValue))
+    }
+  }
+
+  /** Converts the underlying sketch state into a byte array. */
+  override def serialize(sketchOption: Option[T]): Array[Byte] = {
+    sketchOption match {
+      case Some(sketch) => toByteArray(sketch)
+      case None => toByteArray(newHeapInstance(kValue))
+    }
+  }
+
+  /** Wraps the byte array into a sketch instance. */
+  override def deserialize(buffer: Array[Byte]): Option[T] = {
+    if (buffer.nonEmpty) {
+      try {
+        Some(heapifySketch(buffer))
+      } catch {
+        case _: Exception =>
+          throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
+      }
+    } else {
+      createAggregationBuffer()
+    }
+  }
+}
+
 /**
  * Common trait for KLL sketch aggregate functions that support an optional k 
parameter.
  */
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 7b6cbabeb129..a5c98675c977 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -455,6 +455,9 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.HllSketchAgg | 
hll_sketch_agg | SELECT hll_sketch_estimate(hll_sketch_agg(col, 12)) FROM 
VALUES (1), (1), (2), (2), (3) tab(col) | 
struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.HllUnionAgg | 
hll_union_agg | SELECT hll_sketch_estimate(hll_union_agg(sketch, true)) FROM 
(SELECT hll_sketch_agg(col) as sketch FROM VALUES (1) tab(col) UNION ALL SELECT 
hll_sketch_agg(col, 20) as sketch FROM VALUES (1) tab(col)) | 
struct<hll_sketch_estimate(hll_union_agg(sketch, true)):bigint> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus | 
approx_count_distinct | SELECT approx_count_distinct(col1) FROM VALUES (1), 
(1), (2), (2), (3) tab(col1) | struct<approx_count_distinct(col1):bigint> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.KllMergeAggBigint | 
kll_merge_agg_bigint | SELECT 
kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch)) FROM (SELECT 
kll_sketch_agg_bigint(col) as sketch FROM VALUES (1), (2), (3) tab(col) UNION 
ALL SELECT kll_sketch_agg_bigint(col) as sketch FROM VALUES (4), (5), (6) 
tab(col)) t | 
struct<kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch)):bigint> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.KllMergeAggDouble | 
kll_merge_agg_double | SELECT 
kll_sketch_get_n_double(kll_merge_agg_double(sketch)) FROM (SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(1.0 AS DOUBLE)), 
(CAST(2.0 AS DOUBLE)), (CAST(3.0 AS DOUBLE)) tab(col) UNION ALL SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(4.0 AS DOUBLE)), 
(CAST(5.0 AS DOUBLE)), (CAST(6.0 AS DOUBLE)) tab(col)) t | 
struct<kll_sketch_get_n_double(kll_merge_agg_do [...]
+| org.apache.spark.sql.catalyst.expressions.aggregate.KllMergeAggFloat | 
kll_merge_agg_float | SELECT 
kll_sketch_get_n_float(kll_merge_agg_float(sketch)) FROM (SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(1.0 AS FLOAT)), (CAST(2.0 
AS FLOAT)), (CAST(3.0 AS FLOAT)) tab(col) UNION ALL SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(4.0 AS FLOAT)), (CAST(5.0 
AS FLOAT)), (CAST(6.0 AS FLOAT)) tab(col)) t | 
struct<kll_sketch_get_n_float(kll_merge_agg_float(sketch)): [...]
 | org.apache.spark.sql.catalyst.expressions.aggregate.KllSketchAggBigint | 
kll_sketch_agg_bigint | SELECT 
LENGTH(kll_sketch_to_string_bigint(kll_sketch_agg_bigint(col))) > 0 FROM VALUES 
(1), (2), (3), (4), (5) tab(col) | 
struct<(length(kll_sketch_to_string_bigint(kll_sketch_agg_bigint(col))) > 
0):boolean> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.KllSketchAggDouble | 
kll_sketch_agg_double | SELECT 
LENGTH(kll_sketch_to_string_double(kll_sketch_agg_double(col))) > 0 FROM VALUES 
(CAST(1.0 AS DOUBLE)), (CAST(2.0 AS DOUBLE)), (CAST(3.0 AS DOUBLE)), (CAST(4.0 
AS DOUBLE)), (CAST(5.0 AS DOUBLE)) tab(col) | 
struct<(length(kll_sketch_to_string_double(kll_sketch_agg_double(col))) > 
0):boolean> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.KllSketchAggFloat | 
kll_sketch_agg_float | SELECT 
LENGTH(kll_sketch_to_string_float(kll_sketch_agg_float(col))) > 0 FROM VALUES 
(CAST(1.0 AS FLOAT)), (CAST(2.0 AS FLOAT)), (CAST(3.0 AS FLOAT)), (CAST(4.0 AS 
FLOAT)), (CAST(5.0 AS FLOAT)) tab(col) | 
struct<(length(kll_sketch_to_string_float(kll_sketch_agg_float(col))) > 
0):boolean> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
index 3eea568420c0..8a2b50131627 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/kllquantiles.sql.out
@@ -326,6 +326,262 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+SELECT
+  parity,
+  kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS total_count
+FROM (
+  SELECT
+    col1 % 2 AS parity,
+    kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  GROUP BY col1 % 2
+) grouped_sketches
+GROUP BY parity
+HAVING kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) > 3
+-- !query analysis
+Filter (total_count#xL > cast(3 as bigint))
++- Aggregate [parity#x], [parity#x, 
kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col#x, None, 0, 0)) AS 
total_count#xL]
+   +- SubqueryAlias grouped_sketches
+      +- Aggregate [(col1#x % 2)], [(col1#x % 2) AS parity#x, 
kll_sketch_agg_bigint(col1#x, None, 0, 0) AS sketch_col#x]
+         +- SubqueryAlias spark_catalog.default.t_int_1_5_through_7_11
+            +- Relation 
spark_catalog.default.t_int_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  WHERE col1 > 1000
+) empty_sketches
+-- !query analysis
+Aggregate [kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col#x, None, 0, 
0)) AS empty_merge_n#xL]
++- SubqueryAlias empty_sketches
+   +- Aggregate [kll_sketch_agg_bigint(col1#x, None, 0, 0) AS sketch_col#x]
+      +- Filter (col1#x > 1000)
+         +- SubqueryAlias spark_catalog.default.t_int_1_5_through_7_11
+            +- Relation 
spark_catalog.default.t_int_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT kll_sketch_get_n_float(kll_merge_agg_float(sketch_col)) AS empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches
+-- !query analysis
+Aggregate [kll_sketch_get_n_float(kll_merge_agg_float(sketch_col#x, None, 0, 
0)) AS empty_merge_n#xL]
++- SubqueryAlias empty_sketches
+   +- Aggregate [kll_sketch_agg_float(col1#x, None, 0, 0) AS sketch_col#x]
+      +- Filter (cast(col1#x as double) > cast(1000.0 as double))
+         +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+            +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT kll_sketch_get_n_double(kll_merge_agg_double(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_double(col1) AS sketch_col
+  FROM t_double_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches
+-- !query analysis
+Aggregate [kll_sketch_get_n_double(kll_merge_agg_double(sketch_col#x, None, 0, 
0)) AS empty_merge_n#xL]
++- SubqueryAlias empty_sketches
+   +- Aggregate [kll_sketch_agg_double(col1#x, None, 0, 0) AS sketch_col#x]
+      +- Filter (col1#x > cast(1000.0 as double))
+         +- SubqueryAlias spark_catalog.default.t_double_1_5_through_7_11
+            +- Relation 
spark_catalog.default.t_double_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_bigint(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_bigint(agg, 0.5) - 4) < 1 AS 
median_close_to_4,
+       abs(kll_sketch_get_rank_bigint(agg, 3) - 0.4) < 0.1 AS 
rank3_close_to_0_4
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_int_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_short_1_5_through_7_11
+    ) sketches
+)
+-- !query analysis
+Project [lower(kll_sketch_to_string_bigint(agg#x)) LIKE %kll% AS 
str_contains_kll#x, (abs((kll_sketch_get_quantile_bigint(agg#x, cast(0.5 as 
double)) - cast(4 as bigint))) < cast(1 as bigint)) AS median_close_to_4#x, 
(abs((kll_sketch_get_rank_bigint(agg#x, cast(3 as bigint)) - cast(0.4 as 
double))) < cast(0.1 as double)) AS rank3_close_to_0_4#x]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Aggregate [kll_merge_agg_bigint(sketch_col#x, None, 0, 0) AS agg#x]
+      +- SubqueryAlias sketches
+         +- Union false, false
+            :- Aggregate [kll_sketch_agg_bigint(col1#x, None, 0, 0) AS 
sketch_col#x]
+            :  +- SubqueryAlias spark_catalog.default.t_int_1_5_through_7_11
+            :     +- Relation 
spark_catalog.default.t_int_1_5_through_7_11[col1#x,col2#x] parquet
+            +- Aggregate [kll_sketch_agg_bigint(col1#x, None, 0, 0) AS 
sketch_col#x]
+               +- SubqueryAlias spark_catalog.default.t_short_1_5_through_7_11
+                  +- Relation 
spark_catalog.default.t_short_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_float(agg)) LIKE '%kll%' AS str_contains_kll,
+       abs(kll_sketch_get_quantile_float(agg, 0.5) - 5.5) < 1.0 AS 
median_close_to_5_5,
+       abs(kll_sketch_get_rank_float(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_float(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_float(col1) AS sketch_col
+        FROM t_float_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_float(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+)
+-- !query analysis
+Project [lower(kll_sketch_to_string_float(agg#x)) LIKE %kll% AS 
str_contains_kll#x, (abs((cast(kll_sketch_get_quantile_float(agg#x, cast(0.5 as 
double)) as double) - cast(5.5 as double))) < cast(1.0 as double)) AS 
median_close_to_5_5#x, (abs((kll_sketch_get_rank_float(agg#x, cast(5.0 as 
float)) - cast(0.35 as double))) < cast(0.15 as double)) AS 
rank5_close_to_0_35#x]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Aggregate [kll_merge_agg_float(sketch_col#x, None, 0, 0) AS agg#x]
+      +- SubqueryAlias sketches
+         +- Union false, false
+            :- Aggregate [kll_sketch_agg_float(col1#x, None, 0, 0) AS 
sketch_col#x]
+            :  +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+            :     +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+            +- Aggregate [kll_sketch_agg_float(col2#x, None, 0, 0) AS 
sketch_col#x]
+               +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+                  +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_double(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_double(agg, 0.5) - 6.0) < 1.0 AS 
median_close_to_6,
+       abs(kll_sketch_get_rank_double(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_double(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_double(col1) AS sketch_col
+        FROM t_double_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_double(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+)
+-- !query analysis
+Project [lower(kll_sketch_to_string_double(agg#x)) LIKE %kll% AS 
str_contains_kll#x, (abs((kll_sketch_get_quantile_double(agg#x, cast(0.5 as 
double)) - cast(6.0 as double))) < cast(1.0 as double)) AS median_close_to_6#x, 
(abs((kll_sketch_get_rank_double(agg#x, cast(5.0 as double)) - cast(0.35 as 
double))) < cast(0.15 as double)) AS rank5_close_to_0_35#x]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Aggregate [kll_merge_agg_double(sketch_col#x, None, 0, 0) AS agg#x]
+      +- SubqueryAlias sketches
+         +- Union false, false
+            :- Aggregate [kll_sketch_agg_double(col1#x, None, 0, 0) AS 
sketch_col#x]
+            :  +- SubqueryAlias spark_catalog.default.t_double_1_5_through_7_11
+            :     +- Relation 
spark_catalog.default.t_double_1_5_through_7_11[col1#x,col2#x] parquet
+            +- Aggregate [kll_sketch_agg_double(col2#x, None, 0, 0) AS 
sketch_col#x]
+               +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+                  +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_bigint(kll_merge_agg_bigint(sketch_col, 
400))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_bigint(col1, 400) AS sketch_col
+    FROM t_long_1_5_through_7_11
+    UNION ALL
+    SELECT kll_sketch_agg_bigint(col2, 400) AS sketch_col
+    FROM t_byte_1_5_through_7_11
+) sketches
+-- !query analysis
+Aggregate 
[(length(kll_sketch_to_string_bigint(kll_merge_agg_bigint(sketch_col#x, 
Some(400), 0, 0))) > 0) AS merged_with_k#x]
++- SubqueryAlias sketches
+   +- Union false, false
+      :- Aggregate [kll_sketch_agg_bigint(col1#xL, Some(400), 0, 0) AS 
sketch_col#x]
+      :  +- SubqueryAlias spark_catalog.default.t_long_1_5_through_7_11
+      :     +- Relation 
spark_catalog.default.t_long_1_5_through_7_11[col1#xL,col2#xL] parquet
+      +- Aggregate [kll_sketch_agg_bigint(col2#x, Some(400), 0, 0) AS 
sketch_col#x]
+         +- SubqueryAlias spark_catalog.default.t_byte_1_5_through_7_11
+            +- Relation 
spark_catalog.default.t_byte_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_float(kll_merge_agg_float(sketch_col, 
300))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_float(col1, 300) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches
+-- !query analysis
+Aggregate 
[(length(kll_sketch_to_string_float(kll_merge_agg_float(sketch_col#x, 
Some(300), 0, 0))) > 0) AS merged_with_k#x]
++- SubqueryAlias sketches
+   +- Aggregate [kll_sketch_agg_float(col1#x, Some(300), 0, 0) AS sketch_col#x]
+      +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+         +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_double(kll_merge_agg_double(sketch_col, 
500))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_double(col1, 500) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches
+-- !query analysis
+Aggregate 
[(length(kll_sketch_to_string_double(kll_merge_agg_double(sketch_col#x, 
Some(500), 0, 0))) > 0) AS merged_with_k#x]
++- SubqueryAlias sketches
+   +- Aggregate [kll_sketch_agg_double(col1#x, Some(500), 0, 0) AS 
sketch_col#x]
+      +- SubqueryAlias spark_catalog.default.t_double_1_5_through_7_11
+         +- Relation 
spark_catalog.default.t_double_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT abs(kll_sketch_get_quantile_bigint(agg_with_nulls, 0.5) -
+           kll_sketch_get_quantile_bigint(agg_without_nulls, 0.5)) < 1 AS 
medians_match
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_with_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT CAST(NULL AS BINARY) AS sketch_col
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_with_nulls
+) WITH_NULLS,
+(
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_without_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_without_nulls
+) WITHOUT_NULLS
+-- !query analysis
+Project [(abs((kll_sketch_get_quantile_bigint(agg_with_nulls#x, cast(0.5 as 
double)) - kll_sketch_get_quantile_bigint(agg_without_nulls#x, cast(0.5 as 
double)))) < cast(1 as bigint)) AS medians_match#x]
++- Join Inner
+   :- SubqueryAlias WITH_NULLS
+   :  +- Aggregate [kll_merge_agg_bigint(sketch_col#x, None, 0, 0) AS 
agg_with_nulls#x]
+   :     +- SubqueryAlias sketches_with_nulls
+   :        +- Union false, false
+   :           :- Union false, false
+   :           :  :- Aggregate [kll_sketch_agg_bigint(col1#xL, None, 0, 0) AS 
sketch_col#x]
+   :           :  :  +- SubqueryAlias 
spark_catalog.default.t_long_1_5_through_7_11
+   :           :  :     +- Relation 
spark_catalog.default.t_long_1_5_through_7_11[col1#xL,col2#xL] parquet
+   :           :  +- Project [cast(null as binary) AS sketch_col#x]
+   :           :     +- OneRowRelation
+   :           +- Aggregate [kll_sketch_agg_bigint(col1#x, None, 0, 0) AS 
sketch_col#x]
+   :              +- SubqueryAlias 
spark_catalog.default.t_byte_1_5_through_7_11
+   :                 +- Relation 
spark_catalog.default.t_byte_1_5_through_7_11[col1#x,col2#x] parquet
+   +- SubqueryAlias WITHOUT_NULLS
+      +- Aggregate [kll_merge_agg_bigint(sketch_col#x, None, 0, 0) AS 
agg_without_nulls#x]
+         +- SubqueryAlias sketches_without_nulls
+            +- Union false, false
+               :- Aggregate [kll_sketch_agg_bigint(col1#xL, None, 0, 0) AS 
sketch_col#x]
+               :  +- SubqueryAlias 
spark_catalog.default.t_long_1_5_through_7_11
+               :     +- Relation 
spark_catalog.default.t_long_1_5_through_7_11[col1#xL,col2#xL] parquet
+               +- Aggregate [kll_sketch_agg_bigint(col1#x, None, 0, 0) AS 
sketch_col#x]
+                  +- SubqueryAlias 
spark_catalog.default.t_byte_1_5_through_7_11
+                     +- Relation 
spark_catalog.default.t_byte_1_5_through_7_11[col1#x,col2#x] parquet
+
+
 -- !query
 SELECT abs(kll_sketch_get_quantile_bigint(agg_with_nulls, 0.5) - 
            kll_sketch_get_quantile_bigint(agg_without_nulls, 0.5)) < 1 AS 
medians_match,
@@ -1054,6 +1310,235 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col) AS wrong_type_merge
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+) float_sketches
+-- !query analysis
+Aggregate [kll_merge_agg_bigint(sketch_col#x, None, 0, 0) AS 
wrong_type_merge#x]
++- SubqueryAlias float_sketches
+   +- Aggregate [kll_sketch_agg_float(col1#x, None, 0, 0) AS sketch_col#x]
+      +- SubqueryAlias spark_catalog.default.t_float_1_5_through_7_11
+         +- Relation 
spark_catalog.default.t_float_1_5_through_7_11[col1#x,col2#x] parquet
+
+
+-- !query
+SELECT kll_merge_agg_bigint(col1) AS merge_wrong_type
+FROM t_long_1_5_through_7_11
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"BIGINT\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_bigint(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 33,
+    "fragment" : "kll_merge_agg_bigint(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_float(col1) AS merge_wrong_type
+FROM t_float_1_5_through_7_11
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"FLOAT\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_float(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 32,
+    "fragment" : "kll_merge_agg_float(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_double(col1) AS merge_wrong_type
+FROM t_double_1_5_through_7_11
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"DOUBLE\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_double(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 33,
+    "fragment" : "kll_merge_agg_double(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col) AS invalid_merge
+FROM (
+    SELECT CAST('not_a_sketch' AS BINARY) AS sketch_col
+) invalid_data
+-- !query analysis
+Aggregate [kll_merge_agg_bigint(sketch_col#x, None, 0, 0) AS invalid_merge#x]
++- SubqueryAlias invalid_data
+   +- Project [cast(not_a_sketch as binary) AS sketch_col#x]
+      +- OneRowRelation
+
+
+-- !query
+SELECT kll_merge_agg_float(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'deadbeef' AS sketch_col
+) invalid_data
+-- !query analysis
+Aggregate [kll_merge_agg_float(sketch_col#x, None, 0, 0) AS invalid_merge#x]
++- SubqueryAlias invalid_data
+   +- Project [0xDEADBEEF AS sketch_col#x]
+      +- OneRowRelation
+
+
+-- !query
+SELECT kll_merge_agg_double(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'cafebabe' AS sketch_col
+) invalid_data
+-- !query analysis
+Aggregate [kll_merge_agg_double(sketch_col#x, None, 0, 0) AS invalid_merge#x]
++- SubqueryAlias invalid_data
+   +- Project [0xCAFEBABE AS sketch_col#x]
+      +- OneRowRelation
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col, 7) AS k_too_small
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "hint" : "",
+    "msg" : "[KLL_SKETCH_K_OUT_OF_RANGE] For function `kll_merge_agg_bigint`, 
the k parameter must be between 8 and 65535 (inclusive), but got 7. SQLSTATE: 
22003",
+    "sqlExpr" : "\"kll_merge_agg_bigint(sketch_col, 7)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 42,
+    "fragment" : "kll_merge_agg_bigint(sketch_col, 7)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_float(sketch_col, 65536) AS k_too_large
+FROM (
+    SELECT kll_sketch_agg_float(col1) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "hint" : "",
+    "msg" : "[KLL_SKETCH_K_OUT_OF_RANGE] For function `kll_merge_agg_float`, 
the k parameter must be between 8 and 65535 (inclusive), but got 65536. 
SQLSTATE: 22003",
+    "sqlExpr" : "\"kll_merge_agg_float(sketch_col, 65536)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 45,
+    "fragment" : "kll_merge_agg_float(sketch_col, 65536)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_double(sketch_col, CAST(NULL AS INT)) AS k_is_null
+FROM (
+    SELECT kll_sketch_agg_double(col1) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "k",
+    "sqlExpr" : "\"kll_merge_agg_double(sketch_col, CAST(NULL AS INT))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 58,
+    "fragment" : "kll_merge_agg_double(sketch_col, CAST(NULL AS INT))"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col, CAST(RAND() * 100 AS INT) + 200) AS 
k_non_constant
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "(CAST((rand() * CAST(100 AS DOUBLE)) AS INT) + 200)",
+    "inputName" : "k",
+    "inputType" : "int",
+    "sqlExpr" : "\"kll_merge_agg_bigint(sketch_col, (CAST((rand() * 100) AS 
INT) + 200))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 72,
+    "fragment" : "kll_merge_agg_bigint(sketch_col, CAST(RAND() * 100 AS INT) + 
200)"
+  } ]
+}
+
+
 -- !query
 SELECT kll_sketch_get_n_bigint(X'deadbeef') AS invalid_binary_bigint
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql 
b/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
index fe1b61de037d..9300754d204a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql
@@ -123,7 +123,7 @@ FROM (
     FROM t_float_1_5_through_7_11
 );
 
--- Merging sketches and converting them to strings
+-- Merging sketches and converting them to strings (scalar merge functions)
 SELECT
   split(
     kll_sketch_to_string_bigint(
@@ -160,6 +160,143 @@ SELECT
   )[1] AS result
 FROM t_byte_1_5_through_7_11;
 
+-- Tests for KllMergeAgg* aggregate functions
+-- These functions merge multiple binary sketch representations
+
+-- Test GROUP BY with kll_merge_agg_bigint and HAVING clause
+SELECT
+  parity,
+  kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS total_count
+FROM (
+  SELECT
+    col1 % 2 AS parity,
+    kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  GROUP BY col1 % 2
+) grouped_sketches
+GROUP BY parity
+HAVING kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) > 3;
+
+-- Test empty aggregation: zero rows input for kll_merge_agg_bigint
+SELECT kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  WHERE col1 > 1000
+) empty_sketches;
+
+-- Test empty aggregation: zero rows input for kll_merge_agg_float
+SELECT kll_sketch_get_n_float(kll_merge_agg_float(sketch_col)) AS empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches;
+
+-- Test empty aggregation: zero rows input for kll_merge_agg_double
+SELECT kll_sketch_get_n_double(kll_merge_agg_double(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_double(col1) AS sketch_col
+  FROM t_double_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches;
+
+-- Test kll_merge_agg_bigint: merge bigint sketches from multiple rows
+SELECT lower(kll_sketch_to_string_bigint(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_bigint(agg, 0.5) - 4) < 1 AS 
median_close_to_4,
+       abs(kll_sketch_get_rank_bigint(agg, 3) - 0.4) < 0.1 AS 
rank3_close_to_0_4
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_int_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_short_1_5_through_7_11
+    ) sketches
+);
+
+-- Test kll_merge_agg_float: merge float sketches from multiple rows
+-- Merging col1 (1-7) and col2 (5-11) gives combined data with median ~5.5
+SELECT lower(kll_sketch_to_string_float(agg)) LIKE '%kll%' AS str_contains_kll,
+       abs(kll_sketch_get_quantile_float(agg, 0.5) - 5.5) < 1.0 AS 
median_close_to_5_5,
+       abs(kll_sketch_get_rank_float(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_float(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_float(col1) AS sketch_col
+        FROM t_float_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_float(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+);
+
+-- Test kll_merge_agg_double: merge double sketches from multiple rows
+SELECT lower(kll_sketch_to_string_double(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_double(agg, 0.5) - 6.0) < 1.0 AS 
median_close_to_6,
+       abs(kll_sketch_get_rank_double(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_double(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_double(col1) AS sketch_col
+        FROM t_double_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_double(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+);
+
+-- Test kll_merge_agg_bigint with custom k parameter
+SELECT LENGTH(kll_sketch_to_string_bigint(kll_merge_agg_bigint(sketch_col, 
400))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_bigint(col1, 400) AS sketch_col
+    FROM t_long_1_5_through_7_11
+    UNION ALL
+    SELECT kll_sketch_agg_bigint(col2, 400) AS sketch_col
+    FROM t_byte_1_5_through_7_11
+) sketches;
+
+-- Test kll_merge_agg_float with custom k parameter
+SELECT LENGTH(kll_sketch_to_string_float(kll_merge_agg_float(sketch_col, 
300))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_float(col1, 300) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches;
+
+-- Test kll_merge_agg_double with custom k parameter
+SELECT LENGTH(kll_sketch_to_string_double(kll_merge_agg_double(sketch_col, 
500))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_double(col1, 500) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches;
+
+-- Test that kll_merge_agg functions ignore NULL sketch values
+SELECT abs(kll_sketch_get_quantile_bigint(agg_with_nulls, 0.5) -
+           kll_sketch_get_quantile_bigint(agg_without_nulls, 0.5)) < 1 AS 
medians_match
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_with_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT CAST(NULL AS BINARY) AS sketch_col
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_with_nulls
+) WITH_NULLS,
+(
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_without_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_without_nulls
+) WITHOUT_NULLS;
+
 -- Tests verifying that NULL input values are ignored by aggregate functions
 
 -- Test BIGINT aggregate ignores NULL values
@@ -405,6 +542,73 @@ FROM t_double_1_5_through_7_11;
 SELECT kll_sketch_agg_bigint(col1, '100') AS k_wrong_type
 FROM t_long_1_5_through_7_11;
 
+-- Negative tests for kll_merge_agg functions
+
+-- Test wrong sketch type: float sketch passed to kll_merge_agg_bigint (should 
fail)
+SELECT kll_merge_agg_bigint(sketch_col) AS wrong_type_merge
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+) float_sketches;
+
+-- Type mismatch: kll_merge_agg_bigint does not accept integer columns (needs 
binary)
+SELECT kll_merge_agg_bigint(col1) AS merge_wrong_type
+FROM t_long_1_5_through_7_11;
+
+-- Type mismatch: kll_merge_agg_float does not accept float columns (needs 
binary)
+SELECT kll_merge_agg_float(col1) AS merge_wrong_type
+FROM t_float_1_5_through_7_11;
+
+-- Type mismatch: kll_merge_agg_double does not accept double columns (needs 
binary)
+SELECT kll_merge_agg_double(col1) AS merge_wrong_type
+FROM t_double_1_5_through_7_11;
+
+-- Invalid binary data for kll_merge_agg_bigint
+SELECT kll_merge_agg_bigint(sketch_col) AS invalid_merge
+FROM (
+    SELECT CAST('not_a_sketch' AS BINARY) AS sketch_col
+) invalid_data;
+
+-- Invalid binary data for kll_merge_agg_float
+SELECT kll_merge_agg_float(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'deadbeef' AS sketch_col
+) invalid_data;
+
+-- Invalid binary data for kll_merge_agg_double
+SELECT kll_merge_agg_double(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'cafebabe' AS sketch_col
+) invalid_data;
+
+-- k parameter too small for kll_merge_agg_bigint
+SELECT kll_merge_agg_bigint(sketch_col, 7) AS k_too_small
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches;
+
+-- k parameter too large for kll_merge_agg_float
+SELECT kll_merge_agg_float(sketch_col, 65536) AS k_too_large
+FROM (
+    SELECT kll_sketch_agg_float(col1) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches;
+
+-- k parameter is NULL for kll_merge_agg_double
+SELECT kll_merge_agg_double(sketch_col, CAST(NULL AS INT)) AS k_is_null
+FROM (
+    SELECT kll_sketch_agg_double(col1) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches;
+
+-- k parameter is not foldable for kll_merge_agg_bigint (using a non-constant 
expression)
+SELECT kll_merge_agg_bigint(sketch_col, CAST(RAND() * 100 AS INT) + 200) AS 
k_non_constant
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches;
+
 -- Negative tests for kll_sketch_get_n functions
 -- Invalid binary data
 SELECT kll_sketch_get_n_bigint(X'deadbeef') AS invalid_binary_bigint;
diff --git a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out 
b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
index fef44781e17c..0b852ad24199 100644
--- a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
@@ -321,6 +321,194 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+SELECT
+  parity,
+  kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS total_count
+FROM (
+  SELECT
+    col1 % 2 AS parity,
+    kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  GROUP BY col1 % 2
+) grouped_sketches
+GROUP BY parity
+HAVING kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) > 3
+-- !query schema
+struct<parity:int,total_count:bigint>
+-- !query output
+1      4
+
+
+-- !query
+SELECT kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+  FROM t_int_1_5_through_7_11
+  WHERE col1 > 1000
+) empty_sketches
+-- !query schema
+struct<empty_merge_n:bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT kll_sketch_get_n_float(kll_merge_agg_float(sketch_col)) AS empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches
+-- !query schema
+struct<empty_merge_n:bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT kll_sketch_get_n_double(kll_merge_agg_double(sketch_col)) AS 
empty_merge_n
+FROM (
+  SELECT kll_sketch_agg_double(col1) AS sketch_col
+  FROM t_double_1_5_through_7_11
+  WHERE col1 > 1000.0
+) empty_sketches
+-- !query schema
+struct<empty_merge_n:bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_bigint(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_bigint(agg, 0.5) - 4) < 1 AS 
median_close_to_4,
+       abs(kll_sketch_get_rank_bigint(agg, 3) - 0.4) < 0.1 AS 
rank3_close_to_0_4
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_int_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_short_1_5_through_7_11
+    ) sketches
+)
+-- !query schema
+struct<str_contains_kll:boolean,median_close_to_4:boolean,rank3_close_to_0_4:boolean>
+-- !query output
+true   true    true
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_float(agg)) LIKE '%kll%' AS str_contains_kll,
+       abs(kll_sketch_get_quantile_float(agg, 0.5) - 5.5) < 1.0 AS 
median_close_to_5_5,
+       abs(kll_sketch_get_rank_float(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_float(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_float(col1) AS sketch_col
+        FROM t_float_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_float(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+)
+-- !query schema
+struct<str_contains_kll:boolean,median_close_to_5_5:boolean,rank5_close_to_0_35:boolean>
+-- !query output
+true   true    true
+
+
+-- !query
+SELECT lower(kll_sketch_to_string_double(agg)) LIKE '%kll%' AS 
str_contains_kll,
+       abs(kll_sketch_get_quantile_double(agg, 0.5) - 6.0) < 1.0 AS 
median_close_to_6,
+       abs(kll_sketch_get_rank_double(agg, 5.0) - 0.35) < 0.15 AS 
rank5_close_to_0_35
+FROM (
+    SELECT kll_merge_agg_double(sketch_col) AS agg
+    FROM (
+        SELECT kll_sketch_agg_double(col1) AS sketch_col
+        FROM t_double_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_double(col2) AS sketch_col
+        FROM t_float_1_5_through_7_11
+    ) sketches
+)
+-- !query schema
+struct<str_contains_kll:boolean,median_close_to_6:boolean,rank5_close_to_0_35:boolean>
+-- !query output
+true   true    true
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_bigint(kll_merge_agg_bigint(sketch_col, 
400))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_bigint(col1, 400) AS sketch_col
+    FROM t_long_1_5_through_7_11
+    UNION ALL
+    SELECT kll_sketch_agg_bigint(col2, 400) AS sketch_col
+    FROM t_byte_1_5_through_7_11
+) sketches
+-- !query schema
+struct<merged_with_k:boolean>
+-- !query output
+true
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_float(kll_merge_agg_float(sketch_col, 
300))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_float(col1, 300) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches
+-- !query schema
+struct<merged_with_k:boolean>
+-- !query output
+true
+
+
+-- !query
+SELECT LENGTH(kll_sketch_to_string_double(kll_merge_agg_double(sketch_col, 
500))) > 0 AS merged_with_k
+FROM (
+    SELECT kll_sketch_agg_double(col1, 500) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches
+-- !query schema
+struct<merged_with_k:boolean>
+-- !query output
+true
+
+
+-- !query
+SELECT abs(kll_sketch_get_quantile_bigint(agg_with_nulls, 0.5) -
+           kll_sketch_get_quantile_bigint(agg_without_nulls, 0.5)) < 1 AS 
medians_match
+FROM (
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_with_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT CAST(NULL AS BINARY) AS sketch_col
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_with_nulls
+) WITH_NULLS,
+(
+    SELECT kll_merge_agg_bigint(sketch_col) AS agg_without_nulls
+    FROM (
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_long_1_5_through_7_11
+        UNION ALL
+        SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+        FROM t_byte_1_5_through_7_11
+    ) sketches_without_nulls
+) WITHOUT_NULLS
+-- !query schema
+struct<medians_match:boolean>
+-- !query output
+true
+
+
 -- !query
 SELECT abs(kll_sketch_get_quantile_bigint(agg_with_nulls, 0.5) - 
            kll_sketch_get_quantile_bigint(agg_without_nulls, 0.5)) < 1 AS 
medians_match,
@@ -1085,6 +1273,272 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col) AS wrong_type_merge
+FROM (
+  SELECT kll_sketch_agg_float(col1) AS sketch_col
+  FROM t_float_1_5_through_7_11
+) float_sketches
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
+  "sqlState" : "22000",
+  "messageParameters" : {
+    "function" : "`kll_merge_agg_bigint`"
+  }
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(col1) AS merge_wrong_type
+FROM t_long_1_5_through_7_11
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"BIGINT\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_bigint(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 33,
+    "fragment" : "kll_merge_agg_bigint(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_float(col1) AS merge_wrong_type
+FROM t_float_1_5_through_7_11
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"FLOAT\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_float(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 32,
+    "fragment" : "kll_merge_agg_float(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_double(col1) AS merge_wrong_type
+FROM t_double_1_5_through_7_11
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col1\"",
+    "inputType" : "\"DOUBLE\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"kll_merge_agg_double(col1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 33,
+    "fragment" : "kll_merge_agg_double(col1)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col) AS invalid_merge
+FROM (
+    SELECT CAST('not_a_sketch' AS BINARY) AS sketch_col
+) invalid_data
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
+  "sqlState" : "22000",
+  "messageParameters" : {
+    "function" : "`kll_merge_agg_bigint`"
+  }
+}
+
+
+-- !query
+SELECT kll_merge_agg_float(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'deadbeef' AS sketch_col
+) invalid_data
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
+  "sqlState" : "22000",
+  "messageParameters" : {
+    "function" : "`kll_merge_agg_float`"
+  }
+}
+
+
+-- !query
+SELECT kll_merge_agg_double(sketch_col) AS invalid_merge
+FROM (
+    SELECT X'cafebabe' AS sketch_col
+) invalid_data
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
+  "sqlState" : "22000",
+  "messageParameters" : {
+    "function" : "`kll_merge_agg_double`"
+  }
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col, 7) AS k_too_small
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "hint" : "",
+    "msg" : "[KLL_SKETCH_K_OUT_OF_RANGE] For function `kll_merge_agg_bigint`, 
the k parameter must be between 8 and 65535 (inclusive), but got 7. SQLSTATE: 
22003",
+    "sqlExpr" : "\"kll_merge_agg_bigint(sketch_col, 7)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 42,
+    "fragment" : "kll_merge_agg_bigint(sketch_col, 7)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_float(sketch_col, 65536) AS k_too_large
+FROM (
+    SELECT kll_sketch_agg_float(col1) AS sketch_col
+    FROM t_float_1_5_through_7_11
+) sketches
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "hint" : "",
+    "msg" : "[KLL_SKETCH_K_OUT_OF_RANGE] For function `kll_merge_agg_float`, 
the k parameter must be between 8 and 65535 (inclusive), but got 65536. 
SQLSTATE: 22003",
+    "sqlExpr" : "\"kll_merge_agg_float(sketch_col, 65536)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 45,
+    "fragment" : "kll_merge_agg_float(sketch_col, 65536)"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_double(sketch_col, CAST(NULL AS INT)) AS k_is_null
+FROM (
+    SELECT kll_sketch_agg_double(col1) AS sketch_col
+    FROM t_double_1_5_through_7_11
+) sketches
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "k",
+    "sqlExpr" : "\"kll_merge_agg_double(sketch_col, CAST(NULL AS INT))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 58,
+    "fragment" : "kll_merge_agg_double(sketch_col, CAST(NULL AS INT))"
+  } ]
+}
+
+
+-- !query
+SELECT kll_merge_agg_bigint(sketch_col, CAST(RAND() * 100 AS INT) + 200) AS 
k_non_constant
+FROM (
+    SELECT kll_sketch_agg_bigint(col1) AS sketch_col
+    FROM t_long_1_5_through_7_11
+) sketches
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "(CAST((rand() * CAST(100 AS DOUBLE)) AS INT) + 200)",
+    "inputName" : "k",
+    "inputType" : "int",
+    "sqlExpr" : "\"kll_merge_agg_bigint(sketch_col, (CAST((rand() * 100) AS 
INT) + 200))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 72,
+    "fragment" : "kll_merge_agg_bigint(sketch_col, CAST(RAND() * 100 AS INT) + 
200)"
+  } ]
+}
+
+
 -- !query
 SELECT kll_sketch_get_n_bigint(X'deadbeef') AS invalid_binary_bigint
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 0dfd37ebeae0..bfe15b33768b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -3467,6 +3467,110 @@ class DataFrameAggregateSuite extends QueryTest
     // Should only count non-null values
     assert(n == 3L)
   }
+
+  test("kll_merge_agg_bigint basic functionality") {
+    // Create two separate sketches
+    val df1 = Seq(1, 2, 3).toDF("value")
+    val df2 = Seq(4, 5, 6).toDF("value")
+
+    val sketch1 = df1.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+    val sketch2 = df2.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+    // Union the sketches and merge them
+    val merged = sketch1.union(sketch2)
+      .agg(kll_merge_agg_bigint($"sketch").alias("merged_sketch"))
+
+    // Verify the merged sketch contains all values
+    val n = 
merged.select(kll_sketch_get_n_bigint($"merged_sketch")).collect()(0)(0)
+    assert(n == 6L)
+
+    // Test with explicit k parameter
+    val mergedWithK = sketch1.union(sketch2)
+      .agg(kll_merge_agg_bigint($"sketch", 400).alias("merged_sketch"))
+    assert(mergedWithK.collect()(0)(0) != null)
+
+    // Test with column name
+    val mergedWithName = sketch1.union(sketch2)
+      .agg(kll_merge_agg_bigint("sketch").alias("merged_sketch"))
+    val n2 = 
mergedWithName.select(kll_sketch_get_n_bigint($"merged_sketch")).collect()(0)(0)
+    assert(n2 == 6L)
+  }
+
+  test("kll_merge_agg_float basic functionality") {
+    // Create two separate sketches
+    val df1 = Seq(1.0f, 2.0f, 3.0f).toDF("value")
+    val df2 = Seq(4.0f, 5.0f, 6.0f).toDF("value")
+
+    val sketch1 = df1.agg(kll_sketch_agg_float($"value").alias("sketch"))
+    val sketch2 = df2.agg(kll_sketch_agg_float($"value").alias("sketch"))
+
+    // Union the sketches and merge them
+    val merged = sketch1.union(sketch2)
+      .agg(kll_merge_agg_float($"sketch").alias("merged_sketch"))
+
+    // Verify the merged sketch contains all values
+    val n = 
merged.select(kll_sketch_get_n_float($"merged_sketch")).collect()(0)(0)
+    assert(n == 6L)
+
+    // Test with explicit k parameter
+    val mergedWithK = sketch1.union(sketch2)
+      .agg(kll_merge_agg_float($"sketch", 300).alias("merged_sketch"))
+    assert(mergedWithK.collect()(0)(0) != null)
+  }
+
+  test("kll_merge_agg_double basic functionality") {
+    // Create two separate sketches
+    val df1 = Seq(1.0, 2.0, 3.0).toDF("value")
+    val df2 = Seq(4.0, 5.0, 6.0).toDF("value")
+
+    val sketch1 = df1.agg(kll_sketch_agg_double($"value").alias("sketch"))
+    val sketch2 = df2.agg(kll_sketch_agg_double($"value").alias("sketch"))
+
+    // Union the sketches and merge them
+    val merged = sketch1.union(sketch2)
+      .agg(kll_merge_agg_double($"sketch").alias("merged_sketch"))
+
+    // Verify the merged sketch contains all values
+    val n = 
merged.select(kll_sketch_get_n_double($"merged_sketch")).collect()(0)(0)
+    assert(n == 6L)
+
+    // Test quantile on merged sketch
+    val quantile = merged.select(
+      kll_sketch_get_quantile_double($"merged_sketch", lit(0.5))
+    ).collect()(0)(0)
+    assert(quantile != null)
+  }
+
+  test("kll_merge_agg with different k values") {
+    // Create sketches with different k values
+    val df1 = Seq(1, 2, 3).toDF("value")
+    val df2 = Seq(4, 5, 6).toDF("value")
+
+    val sketch1 = df1.agg(kll_sketch_agg_bigint($"value", 200).alias("sketch"))
+    val sketch2 = df2.agg(kll_sketch_agg_bigint($"value", 400).alias("sketch"))
+
+    // Merge sketches with different k values (should adopt from first sketch)
+    val merged = sketch1.union(sketch2)
+      .agg(kll_merge_agg_bigint($"sketch").alias("merged_sketch"))
+
+    val n = 
merged.select(kll_sketch_get_n_bigint($"merged_sketch")).collect()(0)(0)
+    assert(n == 6L)
+  }
+
+  test("kll_merge_agg with null values") {
+    val df1 = Seq(1, 2, 3).toDF("value")
+    val dfNull = Seq(Some(4), None, Some(6)).toDF("value")
+
+    val sketch1 = df1.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+    val sketchNull = 
dfNull.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
+
+    // Merge sketch with null - null should be ignored
+    val merged = sketch1.union(sketchNull)
+      .agg(kll_merge_agg_bigint($"sketch").alias("merged_sketch"))
+
+    val n = 
merged.select(kll_sketch_get_n_bigint($"merged_sketch")).collect()(0)(0)
+    assert(n == 5L)
+  }
 }
 
 case class B(c: Option[Double])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to