This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4b9b35b4e02f [SPARK-56820][SQL] Add counter_diff window function for
converting cumulative counters to delta format
4b9b35b4e02f is described below
commit 4b9b35b4e02fe98dcbdb93ca490bf969ba88af99
Author: Petar Nikić <[email protected]>
AuthorDate: Fri May 15 20:56:57 2026 +0800
[SPARK-56820][SQL] Add counter_diff window function for converting
cumulative counters to delta format
### What changes were proposed in this pull request?
This pull request proposes the addition of a new window function,
`counter_diff`, which would be used to convert cumulative counters to delta
format by computing the differences between consecutive values. The function
would include special handling for counter resets, when the cumulative value
gets reset to zero.
#### Syntax
```sql
counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER
BY order_exprs)
```
#### Arguments
* `value`: A cumulative counter. Must be numeric and non-negative.
* `start_time`: An optional timestamp parameter which indicates when the
counter was last set to zero. It is used to better detect counter resets.
* `partition_exprs`: Used to separate independent counters. Good
partitioning columns would be the metric name, as well as any attributes tied
to the metric.
* `order_exprs`: Used to order the rows. Should be the observation
timestamp in ascending order.
#### Example
```sql
SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
FROM VALUES
('http_requests', TIMESTAMP '2026-01-01T00:00:00', 100),
('http_requests', TIMESTAMP '2026-01-01T00:01:00', 200),
('http_requests', TIMESTAMP '2026-01-01T00:02:00', 400)
AS tab (m, t, c)
```
| m | t | c | diff |
|---|---|---|---|
| http_requests | 2026-01-01 00:00:00 | 100 | NULL |
| http_requests | 2026-01-01 00:01:00 | 200 | 100 |
| http_requests | 2026-01-01 00:02:00 | 400 | 200 |
### Why are the changes needed?
Counters are metrics with monotonically increasing values. One example is
the number of HTTP requests processed on a server. With each request, the
counter increases. Counters can be represented in two temporalities:
_cumulative_ or _delta_:
* With _delta_ temporality, each observation represents the increase of the
counter since the last observation.
* With _cumulative_ temporality, each observation represents the total
accumulated value of the counter.
* With cumulative counters, it is possible for the counter to reset to
zero, for example when a restart occurs.
The cumulative representation is typically better for storage and
transmission, as it is handles missed observations better.
* For delta counters, if a single observation is lost, the increase is lost
from the total counter value.
* For cumulative counters, the observation is lost, but the total counter
value does not decrease.
However, the delta representation is required for performing analytics on
the metric, as they can be aggregated and bucketized.
`counter_diff` reduces the gap between these two representations. Given a
cumulative counter, it computes the differences between consecutive values,
resulting in the equivalent delta representation for the counter, which can be
used in further analysis.
### Does this PR introduce _any_ user-facing change?
The `counter_diff` window function is a new function.
### How was this patch tested?
A new test, `counter-diff.sql`, has been added with various SQL queries
involving `counter_diff` and their expected outputs.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code with Claude Opus 4.7
Closes #55828 from pnikic-db/counter-diff-window-function.
Authored-by: Petar Nikić <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 12 +
.../source/reference/pyspark.sql/functions.rst | 1 +
python/pyspark/sql/connect/functions/builtin.py | 9 +
python/pyspark/sql/functions/__init__.py | 1 +
python/pyspark/sql/functions/builtin.py | 78 ++
.../sql/tests/connect/test_connect_function.py | 31 +
python/pyspark/sql/tests/test_functions.py | 30 +
.../scala/org/apache/spark/sql/functions.scala | 57 ++
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 +
.../catalyst/analysis/resolver/ResolverGuard.scala | 2 +-
.../sql/catalyst/expressions/CounterDiff.scala | 355 +++++++++
.../catalyst/expressions/decimalExpressions.scala | 31 +
.../sql-functions/sql-expression-schema.md | 1 +
.../analyzer-results/counter-diff.sql.out | 827 ++++++++++++++++++++
.../resources/sql-tests/inputs/counter-diff.sql | 391 ++++++++++
.../sql-tests/results/counter-diff.sql.out | 864 +++++++++++++++++++++
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 22 +
17 files changed, 2712 insertions(+), 1 deletion(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 1d4f4317461f..35fabe59f0d9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1386,6 +1386,18 @@
],
"sqlState" : "0A000"
},
+ "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE" : {
+ "message" : [
+ "A negative counter value <value> was provided to function <function>.
Negative counter values are not allowed."
+ ],
+ "sqlState" : "22003"
+ },
+ "COUNTER_DIFF_START_TIME_DECREASED" : {
+ "message" : [
+ "Start time provided to function <function> decreased from
<previousStartTime> to <currentStartTime>. Start time is required to be
non-decreasing."
+ ],
+ "sqlState" : "22023"
+ },
"CREATE_PERMANENT_VIEW_WITHOUT_ALIAS" : {
"message" : [
"Not allowed to create the permanent view <name> without explicitly
assigning an alias for the expression <attr>."
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst
b/python/docs/source/reference/pyspark.sql/functions.rst
index d6810722e79a..0303308dd348 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -520,6 +520,7 @@ Window Functions
.. autosummary::
:toctree: api/
+ counter_diff
cume_dist
dense_rank
lag
diff --git a/python/pyspark/sql/connect/functions/builtin.py
b/python/pyspark/sql/connect/functions/builtin.py
index 22e52d91232c..0ea0fe65a0ff 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -1539,6 +1539,15 @@ bit_xor.__doc__ = pysparkfuncs.bit_xor.__doc__
# Window Functions
+def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] =
None) -> Column:
+ if startTime is None:
+ return _invoke_function_over_columns("counter_diff", value)
+ return _invoke_function_over_columns("counter_diff", value, startTime)
+
+
+counter_diff.__doc__ = pysparkfuncs.counter_diff.__doc__
+
+
def cume_dist() -> Column:
return _invoke_function("cume_dist")
diff --git a/python/pyspark/sql/functions/__init__.py
b/python/pyspark/sql/functions/__init__.py
index 27db280be86d..b90b5a26ecb0 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -434,6 +434,7 @@ __all__ = [ # noqa: F405
"var_samp",
"variance",
# Window Functions
+ "counter_diff",
"cume_dist",
"dense_rank",
"lag",
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index be948a22a30f..eccb7d768e88 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -6464,6 +6464,84 @@ def rank() -> Column:
return _invoke_function("rank")
+@_try_remote_functions
+def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] =
None) -> Column:
+ """
+ Window function: computes the differences between consecutive cumulative
counter values in a
+ time series, thereby converting the counter from the cumulative to the
delta format.
+
+ Gracefully handles counter resets by returning NULL. Counter resets are
detected when the
+ counter value decreases, or when the start time advances between rows.
+
+ Use the PARTITION BY clause of the window to separate independent
counters. This is done by
+ specifying all columns which uniquely identify a time series. These are
typically the counter
+ name and any attributes tied to the counter.
+
+ Use the ORDER BY clause of the window to order the observations by the
associated timestamp
+ in ascending order.
+
+ .. versionadded:: 4.2.0
+
+ Parameters
+ ----------
+ value : :class:`~pyspark.sql.Column` or column name
+ A cumulative counter. Must be a numeric data type. Must be
non-negative.
+ startTime : :class:`~pyspark.sql.Column` or column name, optional
+ An optional timestamp parameter which indicates when the counter was
last set to zero.
+ Used to signal counter resets.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The difference between the current and previous counter value within
the window partition.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql import Window
+ >>> from datetime import datetime
+ >>> df = spark.createDataFrame(
+ ... [('http_requests', datetime(2026, 1, 1, 0, 0), 100),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 1), 200),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 2), 400),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 3), 50),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 4), 100)],
+ ... "m STRING, t TIMESTAMP_NTZ, c INT")
+ >>> w = Window.partitionBy("m").orderBy("t")
+ >>> df.select("m", "t", "c",
sf.counter_diff("c").over(w).alias("diff")).show()
+ +-------------+-------------------+---+----+
+ | m| t| c|diff|
+ +-------------+-------------------+---+----+
+ |http_requests|2026-01-01 00:00:00|100|NULL|
+ |http_requests|2026-01-01 00:01:00|200| 100|
+ |http_requests|2026-01-01 00:02:00|400| 200|
+ |http_requests|2026-01-01 00:03:00| 50|NULL|
+ |http_requests|2026-01-01 00:04:00|100| 50|
+ +-------------+-------------------+---+----+
+
+ >>> df2 = spark.createDataFrame(
+ ... [('http_requests', datetime(2026, 1, 1, 0, 0), 100, datetime(2026,
1, 1, 0, 0)),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 1), 200, datetime(2026,
1, 1, 0, 0)),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 2), 400, datetime(2026,
1, 1, 0, 0)),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 3), 500, datetime(2026,
1, 1, 0, 2, 15)),
+ ... ('http_requests', datetime(2026, 1, 1, 0, 4), 600, datetime(2026,
1, 1, 0, 2, 15))],
+ ... "m STRING, t TIMESTAMP_NTZ, c INT, s TIMESTAMP_NTZ")
+ >>> df2.select("m", "t", "s", "c", sf.counter_diff("c",
"s").over(w).alias("diff")).show()
+ +-------------+-------------------+-------------------+---+----+
+ | m| t| s| c|diff|
+ +-------------+-------------------+-------------------+---+----+
+ |http_requests|2026-01-01 00:00:00|2026-01-01 00:00:00|100|NULL|
+ |http_requests|2026-01-01 00:01:00|2026-01-01 00:00:00|200| 100|
+ |http_requests|2026-01-01 00:02:00|2026-01-01 00:00:00|400| 200|
+ |http_requests|2026-01-01 00:03:00|2026-01-01 00:02:15|500|NULL|
+ |http_requests|2026-01-01 00:04:00|2026-01-01 00:02:15|600| 100|
+ +-------------+-------------------+-------------------+---+----+
+ """
+ if startTime is None:
+ return _invoke_function_over_columns("counter_diff", value)
+ return _invoke_function_over_columns("counter_diff", value, startTime)
+
+
@_try_remote_functions
def cume_dist() -> Column:
"""
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py
b/python/pyspark/sql/tests/connect/test_connect_function.py
index e287ea9326ae..ed653333b6c5 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -766,6 +766,37 @@ class SparkConnectFunctionTests(ReusedMixedTestCase,
PandasOnSparkTestUtils):
sdf.select(scol.over(swin)).toPandas(),
)
+ # test counter_diff: requires non-negative values and has a
two-argument form that
+ # accepts a start_time parameter, so a separate dataset is used.
+ counter_diff_query = """
+ SELECT * FROM VALUES
+ (0, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1200, TIMESTAMP_NTZ
'2026-01-01 00:02:15'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:04:00', 1300, TIMESTAMP_NTZ
'2026-01-01 00:02:15'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:05:00', 50, TIMESTAMP_NTZ
'2026-01-01 00:02:15'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:06:00', 100, TIMESTAMP_NTZ
'2026-01-01 00:02:15'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:07:00', 20, TIMESTAMP_NTZ
'2026-01-01 00:06:45'),
+ (0, TIMESTAMP_NTZ '2026-01-01 00:08:00', 60, TIMESTAMP_NTZ
'2026-01-01 00:06:45'),
+ (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (1, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (1, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (1, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1000, TIMESTAMP_NTZ
'2025-12-31 00:00:00'),
+ (1, TIMESTAMP_NTZ '2026-01-01 00:04:00', 75, TIMESTAMP_NTZ
'2026-01-01 00:03:15')
+ AS tab(p, t, c, s)
+ """
+ cdf_cd = self.connect.sql(counter_diff_query)
+ sdf_cd = self.spark.sql(counter_diff_query)
+ for ccol, scol in [
+ (CF.counter_diff("c"), SF.counter_diff("c")),
+ (CF.counter_diff("c", "s"), SF.counter_diff("c", "s")),
+ ]:
+ self.assert_eq(
+
cdf_cd.select(ccol.over(CW.partitionBy("p").orderBy("t"))).toPandas(),
+
sdf_cd.select(scol.over(SW.partitionBy("p").orderBy("t"))).toPandas(),
+ )
+
# test aggregation functions
for ccol, scol in [
(CF.count("c"), SF.count("c")),
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 75824d3ebe49..978476d8e0b2 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -1954,6 +1954,36 @@ class FunctionsTestsMixin:
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])
+ def test_counter_diff_window_function(self):
+ df = self.spark.createDataFrame(
+ [
+ (1, datetime.datetime(2026, 1, 1, 0, 0, 0), 100),
+ (2, datetime.datetime(2026, 1, 1, 0, 0, 0), 200),
+ (3, datetime.datetime(2026, 1, 1, 0, 0, 0), 50),
+ (4, datetime.datetime(2026, 1, 1, 0, 0, 0), 100),
+ (5, datetime.datetime(2026, 1, 1, 0, 1, 0), 200),
+ (6, datetime.datetime(2026, 1, 1, 0, 1, 0), 300),
+ ],
+ ["t", "st", "c"],
+ )
+ w = Window.orderBy("t")
+
+ rows = df.select("t",
F.counter_diff("c").over(w).alias("d")).orderBy("t").collect()
+ self.assertEqual(
+ [(r.t, r.d) for r in rows],
+ [(1, None), (2, 100), (3, None), (4, 50), (5, 100), (6, 100)],
+ )
+
+ rows = (
+ df.select("t", F.counter_diff("c",
startTime="st").over(w).alias("d"))
+ .orderBy("t")
+ .collect()
+ )
+ self.assertEqual(
+ [(r.t, r.d) for r in rows],
+ [(1, None), (2, 100), (3, None), (4, 50), (5, None), (6, 100)],
+ )
+
def test_window_functions_without_partitionBy(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1,
"2")], ["key", "value"])
w = Window.orderBy("key", df.value)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 4ad731f6d8b0..e07a35ffecd7 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2531,6 +2531,63 @@ object functions {
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
+ /**
+ * Window function: computes the differences between consecutive cumulative
counter values in a
+ * time series, thereby converting the counter from the cumulative to the
delta format.
+ *
+ * Gracefully handles counter resets by returning NULL. Counter resets are
detected when the
+ * counter value decreases.
+ *
+ * Use the PARTITION BY clause of the window to separate independent
counters. This is done by
+ * specifying all columns which uniquely identify a time series. These are
typically the counter
+ * name and any attributes tied to the counter.
+ *
+ * Use the ORDER BY clause of the window to order the observations by the
associated timestamp
+ * in ascending order.
+ *
+ * @param value
+ * A cumulative counter. Must be a numeric data type. Must be non-negative.
+ *
+ * @return
+ * The difference between the current and previous counter value within
the window partition,
+ * according to the order defined by the window's ORDER BY clause.
+ *
+ * @group window_funcs
+ * @since 4.2.0
+ */
+ def counter_diff(value: Column): Column = Column.fn("counter_diff", value)
+
+ /**
+ * Window function: computes the differences between consecutive cumulative
counter values in a
+ * time series, thereby converting the counter from the cumulative to the
delta format.
+ *
+ * Gracefully handles counter resets by returning NULL. Counter resets are
detected when the
+ * counter value decreases, or when the start time advances between rows.
+ *
+ * Use the PARTITION BY clause of the window to separate independent
counters. This is done by
+ * specifying all columns which uniquely identify a time series. These are
typically the counter
+ * name and any attributes tied to the counter.
+ *
+ * Use the ORDER BY clause of the window to order the observations by the
associated timestamp
+ * in ascending order.
+ *
+ * @param value
+ * A cumulative counter. Must be a numeric data type. Must be non-negative.
+ *
+ * @param startTime
+ * A timestamp indicating when the counter was last set to zero. Used to
signal counter
+ * resets.
+ *
+ * @return
+ * The difference between the current and previous counter value within
the window partition,
+ * according to the order defined by the window's ORDER BY clause.
+ *
+ * @group window_funcs
+ * @since 4.2.0
+ */
+ def counter_diff(value: Column, startTime: Column): Column =
+ Column.fn("counter_diff", value, startTime)
+
/**
* Window function: returns the cumulative distribution of values within a
window partition,
* i.e. the fraction of rows that are below the current row.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 34b9559c5851..ac1fbb9f9bef 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -922,6 +922,7 @@ object FunctionRegistry {
expression[Rank]("rank"),
expression[DenseRank]("dense_rank"),
expression[PercentRank]("percent_rank"),
+ expressionBuilder("counter_diff", CounterDiffExpressionBuilder),
// predicates
expression[Between]("between"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
index ed41c320f8c9..8db3fe94f3b6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
@@ -569,7 +569,7 @@ class ResolverGuard(
true
// Decimal
case _: UnscaledValue | _: MakeDecimal | _: CheckOverflow | _:
CheckOverflowInSum |
- _: DecimalAddNoOverflowCheck |
+ _: DecimalAddNoOverflowCheck | _: DecimalSubtractNoOverflowCheck |
_: DecimalDivideWithOverflowCheck =>
true
// Interval
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
new file mode 100644
index 000000000000..453145baf02a
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature,
InputParameter}
+import org.apache.spark.sql.errors.QueryErrorsBase
+import org.apache.spark.sql.types._
+
+/**
+ * The counter_diff window function computes the differences between
consecutive cumulative counter
+ * values in a time series, thereby converting the counter from the cumulative
to the delta format.
+ *
+ * This class serves as the base class for the two versions of the
counter_diff function:
+ * - counter_diff(counter) -> CounterDiff(counter)
+ * - counter_diff(counter, start_time) -> CounterDiffWithStartTime(counter,
startTime)
+ */
+abstract class CounterDiffBase(val counter: Expression)
+ extends AggregateWindowFunction
+ with QueryErrorsBase {
+
+ override def prettyName: String = "counter_diff"
+
+ override def dataType: DataType = counter.dataType
+
+ /**
+ * Last non-NULL counter value from a previous row.
+ */
+ protected lazy val prevCounter: AttributeReference =
+ AttributeReference("prevCounter", counter.dataType, nullable = true)()
+
+ /**
+ * Counter value from the current row.
+ */
+ protected lazy val currCounter: AttributeReference =
+ AttributeReference("currCounter", counter.dataType, nullable = true)()
+
+ /**
+ * Null literal used as a counter_diff result, when appropriate.
+ */
+ protected lazy val nullResult: Expression = Literal.create(null,
counter.dataType)
+
+ /**
+ * Difference between the current and previous counter values.
+ */
+ protected lazy val diff: Expression = {
+ counter.dataType match {
+ // For DECIMAL, subtraction typically widens the result type to handle
possible overflow.
+ // For counter_diff, since counters cannot be negative, there is no risk
of overflow, and no
+ // need to widen the result type, so we subtract directly in the input
type.
+ case dt: DecimalType => DecimalSubtractNoOverflowCheck(currCounter,
prevCounter, dt)
+ case _ => currCounter - prevCounter
+ }
+ }
+
+ /**
+ * Returns the difference, unless the counter has decreased, which is
treated as a counter reset.
+ * In this case, NULL is returned.
+ */
+ protected lazy val diffWithCounterDecreaseCheck: Expression =
+ If(currCounter < prevCounter, nullResult, diff)
+
+ /**
+ * Error raised when the counter is negative.
+ */
+ protected lazy val negativeCounterError: Expression = RaiseError(
+ Literal("COUNTER_DIFF_NEGATIVE_COUNTER_VALUE"),
+ CreateMap(
+ Seq(
+ Literal("value"),
+ Cast(currCounter, StringType),
+ Literal("function"),
+ Literal(toSQLId("counter_diff"))
+ )
+ ),
+ counter.dataType
+ )
+
+ /**
+ * Wraps `inner` with the "skip row on NULL counter" and "raise error on
negative counter" checks.
+ */
+ protected def withCounterNullAndNegativeChecks(inner: Expression):
Expression = {
+ If(IsNull(currCounter),
+ nullResult,
+ If(currCounter < Literal.default(counter.dataType),
+ negativeCounterError,
+ inner
+ )
+ )
+ }
+}
+
+/**
+ * The single-parameter form of `counter_diff`: `counter_diff(value)`.
+ * Detects counter resets only when the counter value decreases.
+ */
+case class CounterDiff(override val counter: Expression)
+ extends CounterDiffBase(counter)
+ with ExpectsInputTypes {
+
+ override def children: Seq[Expression] = Seq(counter)
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
+
+ /**
+ * The aggregation state attributes for the counter_diff function.
+ * In the single-parameter form, there are two attributes:
+ * - prevCounter: The last non-NULL counter value from a previous row.
+ * - currCounter: The counter value from the current row.
+ */
+ override lazy val aggBufferAttributes: Seq[AttributeReference] =
+ Seq(prevCounter, currCounter)
+
+ /**
+ * The initial aggregation state for the counter_diff function. Initial
values are NULL.
+ */
+ override lazy val initialValues: Seq[Expression] = Seq(
+ Literal.create(null, counter.dataType),
+ Literal.create(null, counter.dataType)
+ )
+
+ /**
+ * The update expressions for the counter_diff function's aggregation state.
+ *
+ * Fundamentally, the current value becomes the previous value, and the new
value becomes the
+ * current value.
+ *
+ * Rows with NULL counter values should be skipped. As a result, the
previous counter value
+ * should not be updated in the aggregation state.
+ */
+ override lazy val updateExpressions: Seq[Expression] = Seq(
+ If(IsNotNull(currCounter), currCounter, prevCounter),
+ counter
+ )
+
+ /**
+ * The evaluation expression for the counter_diff function.
+ *
+ * Checks for edge cases first: NULL counter value, negative counter value
and counter reset.
+ * Otherwise, returns the difference between the current and previous
counter values.
+ */
+ override lazy val evaluateExpression: Expression =
+ withCounterNullAndNegativeChecks(diffWithCounterDecreaseCheck)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): CounterDiff =
+ copy(counter = newChildren.head)
+}
+
+/**
+ * The two-parameter form of `counter_diff`: `counter_diff(value, start_time)`.
+ * Additionally checks for counter resets when `start_time` increases, which
signals a new start.
+ * Requires that the start time doesn't decrease, which would indicate moving
backwards in time.
+ */
+case class CounterDiffWithStartTime(
+ override val counter: Expression,
+ startTime: Expression,
+ timeZoneId: Option[String] = None)
+ extends CounterDiffBase(counter)
+ with ExpectsInputTypes
+ with TimeZoneAwareExpression {
+
+ override def withTimeZone(timeZoneId: String): CounterDiffWithStartTime =
+ copy(timeZoneId = Some(timeZoneId))
+
+ override def children: Seq[Expression] = Seq(counter, startTime)
+
+ override def inputTypes: Seq[AbstractDataType] =
+ Seq(NumericType, TypeCollection(TimestampType, TimestampNTZType))
+
+ /**
+ * The start time from a previous row.
+ */
+ protected lazy val prevStartTime: AttributeReference =
+ AttributeReference("prevStartTime", startTime.dataType, nullable = true)()
+
+ /**
+ * The start time from the current row.
+ */
+ protected lazy val currStartTime: AttributeReference =
+ AttributeReference("currStartTime", startTime.dataType, nullable = true)()
+
+ /**
+ * The aggregation state attributes for the counter_diff function.
+ * In the two-parameter form, there are four attributes:
+ * - prevCounter: The last non-NULL counter value from a previous row.
+ * - currCounter: The counter value from the current row.
+ * - prevStartTime: The start time from a previous row.
+ * - currStartTime: The start time from the current row.
+ */
+ override lazy val aggBufferAttributes: Seq[AttributeReference] =
+ Seq(prevCounter, currCounter, prevStartTime, currStartTime)
+
+ /**
+ * The initial aggregation state for the counter_diff function. Initial
values are NULL.
+ */
+ override lazy val initialValues: Seq[Expression] = Seq(
+ Literal.create(null, counter.dataType),
+ Literal.create(null, counter.dataType),
+ Literal.create(null, startTime.dataType),
+ Literal.create(null, startTime.dataType)
+ )
+
+ /**
+ * The update expressions for the counter_diff function's aggregation state.
+ *
+ * Fundamentally, the current value becomes the previous value, and the new
value becomes the
+ * current value. The same applies to the start time.
+ *
+ * Rows with NULL counter values should be skipped. As a result, the
previous values for both
+ * the counter and start time should not be updated in the aggregation state.
+ */
+ override lazy val updateExpressions: Seq[Expression] = Seq(
+ If(IsNotNull(currCounter), currCounter, prevCounter),
+ counter,
+ If(IsNotNull(currCounter), currStartTime, prevStartTime),
+ startTime
+ )
+
+ /**
+ * Error raised when the start time decreases.
+ */
+ protected lazy val decreasedStartTimeError: Expression = RaiseError(
+ Literal("COUNTER_DIFF_START_TIME_DECREASED"),
+ CreateMap(
+ Seq(
+ Literal("function"),
+ Literal(toSQLId("counter_diff")),
+ Literal("previousStartTime"),
+ Cast(prevStartTime, StringType, timeZoneId),
+ Literal("currentStartTime"),
+ Cast(currStartTime, StringType, timeZoneId)
+ )
+ ),
+ counter.dataType
+ )
+
+ /**
+ * The evaluation expression for the counter_diff function.
+ *
+ * Checks for edge cases first: NULL counter value, negative counter value,
start time decrease
+ * and counter resets.
+ *
+ * Otherwise, returns the difference between the current and previous
counter values.
+ */
+ override lazy val evaluateExpression: Expression =
withCounterNullAndNegativeChecks {
+ If(currStartTime < prevStartTime,
+ decreasedStartTimeError,
+ If(prevStartTime < currStartTime,
+ nullResult,
+ diffWithCounterDecreaseCheck
+ )
+ )
+ }
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): CounterDiffWithStartTime =
+ copy(counter = newChildren(0), startTime = newChildren(1))
+}
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+ usage = """
+ _FUNC_(value [, start_time]) - Computes the differences between
consecutive cumulative counter
+ values in a time series, thereby converting the counter from the
cumulative to the delta
+ format.
+ """,
+ arguments = """
+ Arguments:
+ * value - A cumulative counter. Must be a numeric data type. Must be
non-negative.
+ * start_time - An optional timestamp parameter which indicates when the
counter was last set
+ to zero. Used to signal counter resets.
+ """,
+ examples = """
+ Examples:
+ > SELECT m, t, c, _FUNC_(c) OVER (PARTITION BY m ORDER BY t) AS diff
FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200), ('http_requests',
TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests', TIMESTAMP_NTZ
'2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ '2026-01-01
00:04:00', 100) AS tab(m, t, c) ORDER BY t;
+ http_requests 2026-01-01 00:00:00 100 NULL
+ http_requests 2026-01-01 00:01:00 200 100
+ http_requests 2026-01-01 00:02:00 400 200
+ http_requests 2026-01-01 00:03:00 50 NULL
+ http_requests 2026-01-01 00:04:00 100 50
+ > SELECT m, t, s, c, _FUNC_(c, s) OVER (PARTITION BY m ORDER BY t) AS
diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100,
TIMESTAMP_NTZ '2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ
'2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2026-01-01 00:00:00'),
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400, TIMESTAMP_NTZ
'2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:03:00',
500, TIMESTAMP_NTZ '2026-01-01 00:02:15'), ('http_ [...]
+ http_requests 2026-01-01 00:00:00 2026-01-01 00:00:00 100
NULL
+ http_requests 2026-01-01 00:01:00 2026-01-01 00:00:00 200
100
+ http_requests 2026-01-01 00:02:00 2026-01-01 00:00:00 400
200
+ http_requests 2026-01-01 00:03:00 2026-01-01 00:02:15 500
NULL
+ http_requests 2026-01-01 00:04:00 2026-01-01 00:02:15 600
100
+ """,
+ note = """
+ _FUNC_ calculates the difference between the current and the previous
counter value within the
+ partition, according to the order defined by the ORDER BY clause.
+
+ Use the PARTITION BY clause of the window to separate independent
counters. This is done by
+ specifying all columns which uniquely identify a time series. These are
typically the counter
+ name and any attributes tied to the counter.
+
+ Use the ORDER BY clause of the window to order the observations by the
associated timestamp
+ in ascending order.
+
+ The following special cases are handled:
+ 1. If the counter value is NULL, NULL is returned for that row, and the
row is excluded from
+ difference calculations and comparisons for subsequent rows.
+ 2. If the counter value is negative, or the start time decreases, an error
is raised.
+ 3. In the case of a counter reset, NULL is returned.
+ 4. NULL is returned for the first row.
+ 5. If either the current or the previous start time is NULL, the start
time reset check is
+ skipped.
+
+ Counter resets are detected when:
+ 1. The current counter value is less than the previous counter value.
+ 2. The current start time is greater than the previous start time, if
start_time was provided.
+ """,
+ group = "window_funcs",
+ since = "4.2.0"
+)
+// scalastyle:on line.size.limit line.contains.tab
+object CounterDiffExpressionBuilder extends ExpressionBuilder {
+ // Placeholder start time which serves as a default value.
+ private val DefaultStartTime: Literal = Literal.create(null, NullType)
+
+ override def functionSignature: Option[FunctionSignature] = {
+ val valueArg = InputParameter("value")
+ val startTimeArg = InputParameter("start_time", Some(DefaultStartTime))
+ Some(FunctionSignature(Seq(valueArg, startTimeArg)))
+ }
+
+ override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
+ // The function signature defines two parameters, so two expressions are
always provided.
+ // For the single-parameter form, the start_time argument takes on the
default value, which is
+ // exactly `DefaultStartTime`, so `eq` is used to check this case. This
also differentiates
+ // the single-parameter form from the two-parameter form with an explicit
NULL start time.
+ expressions match {
+ case Seq(value, startTime) if startTime eq DefaultStartTime =>
+ CounterDiff(value)
+ case Seq(value, startTime) =>
+ CounterDiffWithStartTime(value, startTime)
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
index 46ab43074409..3e463595ba67 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
@@ -247,6 +247,37 @@ case class DecimalAddNoOverflowCheck(
copy(left = newLeft, right = newRight)
}
+/**
+ * A subtract expression for decimal values which is only used internally.
+ *
+ * Note that, this expression does not check overflow which is different from
`Subtract`.
+ * It is the caller's responsibility to ensure that the result fits in the
declared precision and
+ * scale. For example, `counter_diff` only invokes this on operands that have
already been validated
+ * to satisfy `left >= right >= 0`, so the result is non-negative and bounded
above by `left`.
+ */
+case class DecimalSubtractNoOverflowCheck(
+ left: Expression,
+ right: Expression,
+ override val dataType: DataType) extends BinaryOperator {
+ require(dataType.isInstanceOf[DecimalType])
+
+ override def inputType: AbstractDataType = DecimalType
+ override def symbol: String = "-"
+ private def decimalMethod: String = "$minus"
+
+ private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+ override protected def nullSafeEval(input1: Any, input2: Any): Any =
+ numeric.minus(input1, input2)
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
+ defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.$decimalMethod($eval2)")
+
+ override protected def withNewChildrenInternal(
+ newLeft: Expression, newRight: Expression):
DecimalSubtractNoOverflowCheck =
+ copy(left = newLeft, right = newRight)
+}
+
/**
* A divide expression for decimal values which is only used internally by Avg.
*
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index c56db6c9b7e3..9ac6e49b0343 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -94,6 +94,7 @@
| org.apache.spark.sql.catalyst.expressions.Cos | cos | SELECT cos(0) |
struct<COS(0):double> |
| org.apache.spark.sql.catalyst.expressions.Cosh | cosh | SELECT cosh(0) |
struct<COSH(0):double> |
| org.apache.spark.sql.catalyst.expressions.Cot | cot | SELECT cot(1) |
struct<COT(1):double> |
+| org.apache.spark.sql.catalyst.expressions.CounterDiffExpressionBuilder |
counter_diff | SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t)
AS diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00',
100), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200),
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests',
TIMESTAMP_NTZ '2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ
'2026-01-01 00:04:00', 100) AS tab(m, t, c [...]
| org.apache.spark.sql.catalyst.expressions.Crc32 | crc32 | SELECT
crc32('Spark') | struct<crc32(Spark):bigint> |
| org.apache.spark.sql.catalyst.expressions.CreateArray | array | SELECT
array(1, 2, 3) | struct<array(1, 2, 3):array<int>> |
| org.apache.spark.sql.catalyst.expressions.CreateMap | map | SELECT map(1.0,
'2', 3.0, '4') | struct<map(1.0, 2, 3.0, 4):map<decimal(2,1),string>> |
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out
new file mode 100644
index 000000000000..c8c0764b2d79
--- /dev/null
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out
@@ -0,0 +1,827 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SET TIME ZONE 'UTC'
+-- !query analysis
+SetCommand (spark.sql.session.timeZone,Some(UTC))
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200),
+ ('b', 1, 10), ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x]
+ +- Project [m#x, t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [m#x], [t#x ASC NULLS FIRST]
+ +- Project [m#x, t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [m#x, t#x, c#x]
+
+
+-- !query
+SELECT t,
+ counter_diff(d) OVER (ORDER BY t) AS d_diff,
+ counter_diff(f) OVER (ORDER BY t) AS f_diff,
+ counter_diff(b) OVER (ORDER BY t) AS b_diff,
+ counter_diff(l) OVER (ORDER BY t) AS l_diff,
+ counter_diff(i) OVER (ORDER BY t) AS i_diff,
+ counter_diff(si) OVER (ORDER BY t) AS si_diff,
+ counter_diff(ti) OVER (ORDER BY t) AS ti_diff,
+ counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+ (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+ CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+ CAST(10.5 AS DECIMAL(10,2))),
+ (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+ CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+ CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x,
si_diff#x, ti_diff#x, dec_diff#x]
+ +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x, d_diff#x,
f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, dec_diff#x,
d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x,
dec_diff#x]
+ +- Window [counter_diff(d#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
d_diff#x, counter_diff(f#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
f_diff#x, counter_diff(b#xL) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
b_diff#xL, counter_diff(l#xL) windowspecdefinition(t#x [...]
+ +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x,
dec#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+ (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+ +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS
diff_subtract#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+ (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+ +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS
diff_subtract#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+ (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+ +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS
diff_subtract#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, diff#x, diff#x]
+ +- Window [counter_diff(null) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+ FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias __auto_generated_subquery_name
+ +- Project [t#x, cast(c#x as int) AS c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+ (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+ (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+Sort [t#xL ASC NULLS FIRST], true
++- Project [t#xL, diff#x]
+ +- Project [t#xL, diff#x, diff#x]
+ +- Window [counter_diff(1) windowspecdefinition(t#xL ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#xL ASC NULLS FIRST]
+ +- Project [t#xL]
+ +- SubqueryAlias tab
+ +- Project [id#xL AS t#xL]
+ +- Range (1, 5, step=1)
+
+
+-- !query
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+Sort [t#xL ASC NULLS FIRST], true
++- Project [t#xL, diff#x]
+ +- Project [t#xL, diff#x, diff#x]
+ +- Window [counter_diff((1 + 1)) windowspecdefinition(t#xL ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#xL ASC NULLS FIRST]
+ +- Project [t#xL]
+ +- SubqueryAlias tab
+ +- Project [id#xL AS t#xL]
+ +- Range (1, 5, step=1)
+
+
+-- !query
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+ OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, counter_diff(
+ 1 + 1,
+ TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+ ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ c - lag(c) OVER (ORDER BY t) AS d1,
+ c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x, d1#x, d2#x]
+ +- Project [t#x, c#x, diff#x, _we1#x, _we2#x, diff#x, (c#x - _we1#x) AS
d1#x, (c#x - _we2#x) AS d2#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x, lag(c#x, -1, null)
windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1,
-1)) AS _we2#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS
local_avg,
+ lead(c) OVER (ORDER BY t) AS nc,
+ max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS
tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x]
+ +- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x, diff#x,
local_avg#x, nc#x, some_max#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, avg(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x, lead(c#x, 1, null)
windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1))
AS nc#x, max(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), 1)) AS some_max [...]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT m, t, c,
+ counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+ avg(c) OVER (PARTITION BY m ORDER BY t
+ ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+ max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+ ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x]
+ +- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x, diff#x,
local_avg#x, max_c#x]
+ +- Window [max(c#x) windowspecdefinition(t#x,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS max_c#x], [t#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x, avg(c#x) windowspecdefinition(m#x, t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x], [m#x], [t#x ASC NULLS
FIRST]
+ +- Project [m#x, t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [m#x, t#x, c#x]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, CAST(NULL AS TIMESTAMP), 200),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, st#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, st#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+ ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+ ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x]
+ +- Project [m#x, t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(m#x,
t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [m#x], [t#x ASC NULLS FIRST]
+ +- Project [m#x, t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [m#x, t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [c#x, st#x, t#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, st#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x, st#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+WITH gen AS (
+ SELECT
+ TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+ CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+ ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+ CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+ FROM RANGE(8) AS r(id)
+),
+diffs AS (
+ SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+ FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+ SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT counter_diff(1) AS diff
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "funcName" : "\"counter_diff(1)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 30,
+ "fragment" : "counter_diff(1) AS diff"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "wf_expr" : "counter_diff(tab.c)",
+ "wf_name" : "counter_diff"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"NUMERIC\"",
+ "sqlExpr" : "\"counter_diff(c)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 40,
+ "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"st\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+ "sqlExpr" : "\"counter_diff(c, st)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+ "messageParameters" : {
+ "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())",
+ "wf" : "specifiedwindowframe(RowFrame, -1, 1)"
+ }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (
+ ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+ +- Project [t#x, c#x, diff#x, diff#x]
+ +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
diff#x], [t#x ASC NULLS FIRST]
+ +- Project [t#x, c#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (
+ ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+ "messageParameters" : {
+ "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())",
+ "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(),
currentrow$())"
+ }
+}
+
+
+-- !query
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND",
+ "sqlState" : "4274K",
+ "messageParameters" : {
+ "index" : "0",
+ "parameterName" : "`value`",
+ "routineName" : "`counter_diff`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 39,
+ "fragment" : "counter_diff() OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+ "sqlState" : "42605",
+ "messageParameters" : {
+ "actualNum" : "3",
+ "docroot" : "https://spark.apache.org/docs/latest",
+ "expectedNum" : "[1, 2]",
+ "functionName" : "`counter_diff`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 68,
+ "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY
t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"NUMERIC\"",
+ "sqlExpr" : "\"counter_diff(c)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 40,
+ "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"st\"",
+ "inputType" : "\"DATE\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+ "sqlExpr" : "\"counter_diff(c, st)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+ } ]
+}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql
b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql
new file mode 100644
index 000000000000..34e61f8ee14b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql
@@ -0,0 +1,391 @@
+-- Tests for the counter_diff window function.
+
+SET TIME ZONE 'UTC';
+
+------------------------------------------------------------
+-- Basic semantics
+------------------------------------------------------------
+
+-- Monotonically increasing counter: each diff is current - previous.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t;
+
+-- Single-row input: the only row has no predecessor and returns NULL.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t;
+
+-- Counter reset detected by value decrease: the row after the drop returns
NULL.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t;
+
+-- Equal counter values produce a diff of 0.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t;
+
+-- NULL counter rows have NULL as the result and are skipped when calculating
differences.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t;
+
+-- Each partition has its own first row and prior values.
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200),
+ ('b', 1, 10), ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t;
+
+------------------------------------------------------------
+-- Numeric types
+------------------------------------------------------------
+
+-- counter_diff supports all numeric types:
+-- DOUBLE, FLOAT, BIGINT, LONG, INT, SMALLINT, TINYINT, DECIMAL.
+SELECT t,
+ counter_diff(d) OVER (ORDER BY t) AS d_diff,
+ counter_diff(f) OVER (ORDER BY t) AS f_diff,
+ counter_diff(b) OVER (ORDER BY t) AS b_diff,
+ counter_diff(l) OVER (ORDER BY t) AS l_diff,
+ counter_diff(i) OVER (ORDER BY t) AS i_diff,
+ counter_diff(si) OVER (ORDER BY t) AS si_diff,
+ counter_diff(ti) OVER (ORDER BY t) AS ti_diff,
+ counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+ (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+ CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+ CAST(10.5 AS DECIMAL(10,2))),
+ (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+ CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+ CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t;
+
+------------------------------------------------------------
+-- High-precision DECIMAL inputs
+------------------------------------------------------------
+-- Decimal subtraction normally widens the result type to handle possible
overflow.
+-- For counter_diff, since counters cannot be negative, there is no risk of
overflow, and no
+-- need to widen the result type, so we subtract directly in the input type.
+-- These tests verify that the result type is not widened, and that no
precision is lost for
+-- large precision and scale.
+
+-- DECIMAL(38, 38): normal subtraction would be of type DECIMAL(38, 37).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+ (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t;
+
+-- DECIMAL(38, 6): normal subtraction would be of type DECIMAL(38, 6).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+ (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t;
+
+-- DECIMAL(10, 2): normal subtraction would be of type DECIMAL(11, 2).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+ (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t;
+
+------------------------------------------------------------
+-- NULL type inputs
+------------------------------------------------------------
+
+-- Untyped NULL counters are treated as DOUBLE.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t;
+
+-- Untyped NULL start_time is treated as TIMESTAMP.
+-- The counter behavior is unaffected because NULL start_time skips the reset
check.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t;
+
+-- Explicitly-typed all-NULL INT counter: type stays INT.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+ FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t;
+
+------------------------------------------------------------
+-- Negative counter values are runtime errors
+------------------------------------------------------------
+
+-- INT.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c);
+
+-- DOUBLE.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c);
+
+-- DECIMAL: the error message preserves the configured scale.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c);
+
+-- -Infinity is treated as a negative value.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c);
+
+-- Negative value after a NULL row still results in an error.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c);
+
+------------------------------------------------------------
+-- Special floating-point values
+------------------------------------------------------------
+
+-- Positive Infinity participates in arithmetic:
+-- +Infinity - 100 = +Infinity.
+-- 200 - +Infinity => Reset.
+-- +Infinity - 200 = +Infinity.
+-- +Infinity - +Infinity = NaN.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+ (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t;
+
+-- NaN values are greater than all other numeric values, so:
+-- any value -> NaN => NaN
+-- NaN -> any non-NaN value => Reset
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+ (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t;
+
+------------------------------------------------------------
+-- Constants and foldable expressions as arguments
+------------------------------------------------------------
+
+-- Constant counter: every diff except the first is 0.
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Foldable expression counter (1 + 1) behaves like the constant case.
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Constant start_time alongside constant counter.
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+ OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Foldable counter and foldable start_time.
+SELECT t, counter_diff(
+ 1 + 1,
+ TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+ ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+------------------------------------------------------------
+-- Combined with other window functions
+------------------------------------------------------------
+
+-- Compare counter_diff against lag and lag IGNORE NULLS over the same
ordering.
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ c - lag(c) OVER (ORDER BY t) AS d1,
+ c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t;
+
+-- Mix counter_diff with avg, lead, max over different frames.
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS
local_avg,
+ lead(c) OVER (ORDER BY t) AS nc,
+ max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS
tab(t, c)
+ORDER BY t;
+
+-- Multiple windows with different partitions in the same SELECT.
+SELECT m, t, c,
+ counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+ avg(c) OVER (PARTITION BY m ORDER BY t
+ ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+ max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+ ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t;
+
+------------------------------------------------------------
+-- start_time parameter
+------------------------------------------------------------
+
+-- start_time advance triggers a reset even when the counter increases.
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Equal start_time across rows: behavior matches the case with no start time.
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- NULL start_time skips the start time reset check on that row and the next
row.
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, CAST(NULL AS TIMESTAMP), 200),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Decreasing start_time is a runtime error.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c);
+
+-- Negative counter still raises in the start_time form.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c);
+
+-- TIMESTAMP_NTZ start_time is accepted; same reset behavior as TIMESTAMP.
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Partitioned start_time: each partition tracks its own previous start_time.
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+ ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+ ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t;
+
+-- A NULL-counter row is skipped before the start_time check, so a same-row
+-- start_time decrease is absorbed when the counter is NULL.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t;
+
+-- ...but the next non-NULL row still observes the decrease and raises.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c);
+
+-- A NULL-counter row is skipped before the start_time check, so a same-row
+-- start_time increase is absorbed when the counter is NULL, but is resurfaced
+-- on the next non-NULL row.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+------------------------------------------------------------
+-- End-to-end: bucket per-row diffs by hour using a CTE
+------------------------------------------------------------
+
+-- 8 measurements every 30 min; start_time changes at id=4 -> single reset.
+-- The hourly SUM of diffs yields the per-hour total counter increase.
+WITH gen AS (
+ SELECT
+ TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+ CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+ ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+ CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+ FROM RANGE(8) AS r(id)
+),
+diffs AS (
+ SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+ FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+ SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1;
+
+------------------------------------------------------------
+-- Frame, arity, and type validation
+------------------------------------------------------------
+
+-- counter_diff requires an OVER clause (it is a window function).
+SELECT counter_diff(1) AS diff;
+
+-- Window must specify ORDER BY (otherwise the frame is unordered).
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c);
+
+-- Counter argument must be NUMERIC: STRING is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c);
+
+-- start_time argument must be TIMESTAMP or TIMESTAMP_NTZ: STRING is rejected.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c);
+
+-- A user-supplied frame other than ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c);
+
+-- Explicitly specifying the required frame is allowed.
+SELECT t, counter_diff(c) OVER (
+ ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t;
+
+-- RANGE frames are rejected (counter_diff is row-based).
+SELECT counter_diff(c) OVER (
+ ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c);
+
+-- Zero arguments: not accepted.
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t);
+
+-- More than 2 arguments: not accepted.
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t);
+
+-- BOOLEAN counter is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c);
+
+-- DATE start_time is rejected.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c);
diff --git a/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out
b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out
new file mode 100644
index 000000000000..209882d4cf30
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out
@@ -0,0 +1,864 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SET TIME ZONE 'UTC'
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.session.timeZone UTC
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+3 400 200
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+3 400 200
+4 50 NULL
+5 100 50
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 100 0
+3 200 100
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 NULL NULL
+3 200 100
+
+
+-- !query
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200),
+ ('b', 1, 10), ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int>
+-- !query output
+a 1 100 NULL
+a 2 200 100
+b 1 10 NULL
+b 2 30 20
+
+
+-- !query
+SELECT t,
+ counter_diff(d) OVER (ORDER BY t) AS d_diff,
+ counter_diff(f) OVER (ORDER BY t) AS f_diff,
+ counter_diff(b) OVER (ORDER BY t) AS b_diff,
+ counter_diff(l) OVER (ORDER BY t) AS l_diff,
+ counter_diff(i) OVER (ORDER BY t) AS i_diff,
+ counter_diff(si) OVER (ORDER BY t) AS si_diff,
+ counter_diff(ti) OVER (ORDER BY t) AS ti_diff,
+ counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+ (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+ CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+ CAST(10.5 AS DECIMAL(10,2))),
+ (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+ CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+ CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t
+-- !query schema
+struct<t:int,d_diff:double,f_diff:float,b_diff:bigint,l_diff:bigint,i_diff:int,si_diff:smallint,ti_diff:tinyint,dec_diff:decimal(10,2)>
+-- !query output
+1 NULL NULL NULL NULL NULL NULL NULL NULL
+2 2.0 2.0 200 200 200 200 20 10.00
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+ (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(38,38),diff_subtract:decimal(38,37)>
+-- !query output
+1 NULL NULL
+2 0.00000000000000000000000000000000000001
0.0000000000000000000000000000000000000
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+ (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(38,6),diff_subtract:decimal(38,6)>
+-- !query output
+1 NULL NULL
+2 0.000001 0.000001
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY
t) AS diff_subtract
+FROM VALUES
+ (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+ (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(10,2),diff_subtract:decimal(11,2)>
+-- !query output
+1 NULL NULL
+2 0.01 0.01
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1 NULL
+2 NULL
+3 NULL
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+2 100
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+ FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+2 NULL
+3 NULL
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-5"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-5.0"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-5.500"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-Infinity"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-5"
+ }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+ (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1 NULL
+2 Infinity
+3 NULL
+4 Infinity
+5 NaN
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+ (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1 NULL
+2 NaN
+3 NaN
+4 NULL
+5 200.0
+6 NaN
+7 NULL
+8 50.0
+
+
+-- !query
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1 NULL
+2 0
+3 0
+4 0
+
+
+-- !query
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1 NULL
+2 0
+3 0
+4 0
+
+
+-- !query
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+ OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1 NULL
+2 0
+3 0
+4 0
+
+
+-- !query
+SELECT t, counter_diff(
+ 1 + 1,
+ TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+ ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1 NULL
+2 0
+3 0
+4 0
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ c - lag(c) OVER (ORDER BY t) AS d1,
+ c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int,d1:int,d2:int>
+-- !query output
+1 100 NULL NULL NULL
+2 NULL NULL NULL NULL
+3 300 200 NULL 200
+4 150 NULL -150 -150
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c) OVER (ORDER BY t) AS diff,
+ avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS
local_avg,
+ lead(c) OVER (ORDER BY t) AS nc,
+ max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS
tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int,local_avg:double,nc:int,some_max:int>
+-- !query output
+1 100 NULL 150.0 200 200
+2 200 100 150.0 150 200
+3 150 NULL 250.0 400 400
+4 400 250 350.0 500 500
+5 500 100 500.0 600 600
+6 600 100 550.0 NULL 600
+
+
+-- !query
+SELECT m, t, c,
+ counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+ avg(c) OVER (PARTITION BY m ORDER BY t
+ ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+ max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+ ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+ ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int,local_avg:double,max_c:int>
+-- !query output
+a 1 100 NULL 150.0 100
+a 2 200 100 150.0 200
+a 3 150 NULL 216.66666666666666 150
+a 4 300 150 225.0 300
+b 1 10 NULL 20.0 100
+b 2 30 20 33.333333333333336 200
+b 3 60 30 63.333333333333336 150
+b 4 100 40 80.0 300
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+3 400 NULL
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ (2, CAST(NULL AS TIMESTAMP), 200),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+3 300 100
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED",
+ "sqlState" : "22023",
+ "messageParameters" : {
+ "currentStartTime" : "2026-01-01 00:01:00",
+ "function" : "`counter_diff`",
+ "previousStartTime" : "2026-01-01 00:05:00"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+ "sqlState" : "22003",
+ "messageParameters" : {
+ "function" : "`counter_diff`",
+ "value" : "-1"
+ }
+}
+
+
+-- !query
+SELECT t, c,
+ counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+ (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+ (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1 100 NULL
+2 200 100
+3 300 NULL
+
+
+-- !query
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+ ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+ ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+ ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+ ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+ ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int>
+-- !query output
+a 1 100 NULL
+a 2 200 100
+a 3 500 NULL
+b 1 10 NULL
+b 2 20 10
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+2 NULL
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED",
+ "sqlState" : "22023",
+ "messageParameters" : {
+ "currentStartTime" : "2026-01-01 00:01:00",
+ "function" : "`counter_diff`",
+ "previousStartTime" : "2026-01-01 00:05:00"
+ }
+}
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+ (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+ (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+ (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+2 NULL
+3 NULL
+
+
+-- !query
+WITH gen AS (
+ SELECT
+ TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+ CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+ ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+ CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+ FROM RANGE(8) AS r(id)
+),
+diffs AS (
+ SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+ FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+ SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1
+-- !query schema
+struct<hour_bucket:timestamp,total_diff:bigint>
+-- !query output
+2026-01-01 00:00:00 1000
+2026-01-01 01:00:00 2000
+2026-01-01 02:00:00 800
+2026-01-01 03:00:00 1600
+
+
+-- !query
+SELECT counter_diff(1) AS diff
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "funcName" : "\"counter_diff(1)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 30,
+ "fragment" : "counter_diff(1) AS diff"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "wf_expr" : "counter_diff(tab.c)",
+ "wf_name" : "counter_diff"
+ }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"NUMERIC\"",
+ "sqlExpr" : "\"counter_diff(c)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 40,
+ "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"st\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+ "sqlExpr" : "\"counter_diff(c, st)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+ "messageParameters" : {
+ "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())",
+ "wf" : "specifiedwindowframe(RowFrame, -1, 1)"
+ }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (
+ ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1 NULL
+2 100
+
+
+-- !query
+SELECT counter_diff(c) OVER (
+ ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+ "messageParameters" : {
+ "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())",
+ "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(),
currentrow$())"
+ }
+}
+
+
+-- !query
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND",
+ "sqlState" : "4274K",
+ "messageParameters" : {
+ "index" : "0",
+ "parameterName" : "`value`",
+ "routineName" : "`counter_diff`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 39,
+ "fragment" : "counter_diff() OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+ "sqlState" : "42605",
+ "messageParameters" : {
+ "actualNum" : "3",
+ "docroot" : "https://spark.apache.org/docs/latest",
+ "expectedNum" : "[1, 2]",
+ "functionName" : "`counter_diff`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 68,
+ "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY
t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c\"",
+ "inputType" : "\"BOOLEAN\"",
+ "paramIndex" : "first",
+ "requiredType" : "\"NUMERIC\"",
+ "sqlExpr" : "\"counter_diff(c)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 40,
+ "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+ } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"st\"",
+ "inputType" : "\"DATE\"",
+ "paramIndex" : "second",
+ "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+ "sqlExpr" : "\"counter_diff(c, st)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+ } ]
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index c46cde0d0db1..ffb018b264bd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -835,6 +835,28 @@ class DataFrameWindowFunctionsSuite extends
SharedSparkSession
"v", "z", null, "v", "z", "y", null, "va")))
}
+ test("counter_diff with and without startTime") {
+ import java.sql.Timestamp
+ val df = Seq(
+ (1, Timestamp.valueOf("2024-01-01 00:00:00"), 100),
+ (2, Timestamp.valueOf("2024-01-01 12:00:00"), 200),
+ (3, Timestamp.valueOf("2024-01-01 12:00:00"), 400),
+ (4, Timestamp.valueOf("2024-01-02 00:00:00"), 50),
+ (5, Timestamp.valueOf("2024-01-02 00:00:00"), 150)
+ ).toDF("t", "st", "c")
+ val w = Window.orderBy($"t")
+
+ // 1-arg form: reset detected by counter decrease only.
+ checkAnswer(
+ df.select($"t", counter_diff($"c").over(w)).orderBy($"t"),
+ Seq(Row(1, null), Row(2, 100), Row(3, 200), Row(4, null), Row(5, 100)))
+
+ // 2-arg form: reset also detected by startTime advance.
+ checkAnswer(
+ df.select($"t", counter_diff($"c", $"st").over(w)).orderBy($"t"),
+ Seq(Row(1, null), Row(2, null), Row(3, 200), Row(4, null), Row(5, 100)))
+ }
+
test("lag - Offset expression <offset> must be a literal") {
val nullStr: String = null
val df = Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]