This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 95d781229da3 [SPARK-56820][SQL] Add counter_diff window function for 
converting cumulative counters to delta format
95d781229da3 is described below

commit 95d781229da32c4f8428f2006457f7df9fa5a646
Author: Petar Nikić <[email protected]>
AuthorDate: Fri May 15 20:56:57 2026 +0800

    [SPARK-56820][SQL] Add counter_diff window function for converting 
cumulative counters to delta format
    
    ### What changes were proposed in this pull request?
    
    This pull request proposes the addition of a new window function, 
`counter_diff`, which would be used to convert cumulative counters to delta 
format by computing the differences between consecutive values. The function 
would include special handling for counter resets, when the cumulative value 
gets reset to zero.
    
    #### Syntax
    
    ```sql
    counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER 
BY order_exprs)
    ```
    
    #### Arguments
    
    * `value`: A cumulative counter. Must be numeric and non-negative.
    * `start_time`: An optional timestamp parameter which indicates when the 
counter was last set to zero. It is used to better detect counter resets.
    * `partition_exprs`: Used to separate independent counters. Good 
partitioning columns would be the metric name, as well as any attributes tied 
to the metric.
    * `order_exprs`: Used to order the rows. Should be the observation 
timestamp in ascending order.
    
    #### Example
    
    ```sql
    SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
    FROM VALUES
      ('http_requests', TIMESTAMP '2026-01-01T00:00:00', 100),
      ('http_requests', TIMESTAMP '2026-01-01T00:01:00', 200),
      ('http_requests', TIMESTAMP '2026-01-01T00:02:00', 400)
      AS tab (m, t, c)
    ```
    
    | m | t | c | diff |
    |---|---|---|---|
    | http_requests | 2026-01-01 00:00:00 | 100 | NULL |
    | http_requests | 2026-01-01 00:01:00 | 200 | 100 |
    | http_requests | 2026-01-01 00:02:00 | 400 | 200 |
    
    ### Why are the changes needed?
    
    Counters are metrics with monotonically increasing values. One example is 
the number of HTTP requests processed on a server. With each request, the 
counter increases. Counters can be represented in two temporalities: 
_cumulative_ or _delta_:
    
    * With _delta_ temporality, each observation represents the increase of the 
counter since the last observation.
    * With _cumulative_ temporality, each observation represents the total 
accumulated value of the counter.
      * With cumulative counters, it is possible for the counter to reset to 
zero, for example when a restart occurs.
    
    The cumulative representation is typically better for storage and 
transmission, as it is handles missed observations better.
    * For delta counters, if a single observation is lost, the increase is lost 
from the total counter value.
    * For cumulative counters, the observation is lost, but the total counter 
value does not decrease.
    
    However, the delta representation is required for performing analytics on 
the metric, as they can be aggregated and bucketized.
    
    `counter_diff` reduces the gap between these two representations. Given a 
cumulative counter, it computes the differences between consecutive values, 
resulting in the equivalent delta representation for the counter, which can be 
used in further analysis.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The `counter_diff` window function is a new function.
    
    ### How was this patch tested?
    
    A new test, `counter-diff.sql`, has been added with various SQL queries 
involving `counter_diff` and their expected outputs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code with Claude Opus 4.7
    
    Closes #55828 from pnikic-db/counter-diff-window-function.
    
    Authored-by: Petar Nikić <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 4b9b35b4e02fe98dcbdb93ca490bf969ba88af99)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  12 +
 .../source/reference/pyspark.sql/functions.rst     |   1 +
 python/pyspark/sql/connect/functions/builtin.py    |   9 +
 python/pyspark/sql/functions/__init__.py           |   1 +
 python/pyspark/sql/functions/builtin.py            |  78 ++
 .../sql/tests/connect/test_connect_function.py     |  31 +
 python/pyspark/sql/tests/test_functions.py         |  30 +
 .../scala/org/apache/spark/sql/functions.scala     |  57 ++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../catalyst/analysis/resolver/ResolverGuard.scala |   2 +-
 .../sql/catalyst/expressions/CounterDiff.scala     | 355 +++++++++
 .../catalyst/expressions/decimalExpressions.scala  |  31 +
 .../sql-functions/sql-expression-schema.md         |   1 +
 .../analyzer-results/counter-diff.sql.out          | 827 ++++++++++++++++++++
 .../resources/sql-tests/inputs/counter-diff.sql    | 391 ++++++++++
 .../sql-tests/results/counter-diff.sql.out         | 864 +++++++++++++++++++++
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |  22 +
 17 files changed, 2712 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 1d4f4317461f..35fabe59f0d9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1386,6 +1386,18 @@
     ],
     "sqlState" : "0A000"
   },
+  "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE" : {
+    "message" : [
+      "A negative counter value <value> was provided to function <function>. 
Negative counter values are not allowed."
+    ],
+    "sqlState" : "22003"
+  },
+  "COUNTER_DIFF_START_TIME_DECREASED" : {
+    "message" : [
+      "Start time provided to function <function> decreased from 
<previousStartTime> to <currentStartTime>. Start time is required to be 
non-decreasing."
+    ],
+    "sqlState" : "22023"
+  },
   "CREATE_PERMANENT_VIEW_WITHOUT_ALIAS" : {
     "message" : [
       "Not allowed to create the permanent view <name> without explicitly 
assigning an alias for the expression <attr>."
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index d6810722e79a..0303308dd348 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -520,6 +520,7 @@ Window Functions
 .. autosummary::
     :toctree: api/
 
+    counter_diff
     cume_dist
     dense_rank
     lag
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index 22e52d91232c..0ea0fe65a0ff 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -1539,6 +1539,15 @@ bit_xor.__doc__ = pysparkfuncs.bit_xor.__doc__
 # Window Functions
 
 
+def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = 
None) -> Column:
+    if startTime is None:
+        return _invoke_function_over_columns("counter_diff", value)
+    return _invoke_function_over_columns("counter_diff", value, startTime)
+
+
+counter_diff.__doc__ = pysparkfuncs.counter_diff.__doc__
+
+
 def cume_dist() -> Column:
     return _invoke_function("cume_dist")
 
diff --git a/python/pyspark/sql/functions/__init__.py 
b/python/pyspark/sql/functions/__init__.py
index 27db280be86d..b90b5a26ecb0 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -434,6 +434,7 @@ __all__ = [  # noqa: F405
     "var_samp",
     "variance",
     # Window Functions
+    "counter_diff",
     "cume_dist",
     "dense_rank",
     "lag",
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index be948a22a30f..eccb7d768e88 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -6464,6 +6464,84 @@ def rank() -> Column:
     return _invoke_function("rank")
 
 
+@_try_remote_functions
+def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = 
None) -> Column:
+    """
+    Window function: computes the differences between consecutive cumulative 
counter values in a
+    time series, thereby converting the counter from the cumulative to the 
delta format.
+
+    Gracefully handles counter resets by returning NULL. Counter resets are 
detected when the
+    counter value decreases, or when the start time advances between rows.
+
+    Use the PARTITION BY clause of the window to separate independent 
counters. This is done by
+    specifying all columns which uniquely identify a time series. These are 
typically the counter
+    name and any attributes tied to the counter.
+
+    Use the ORDER BY clause of the window to order the observations by the 
associated timestamp
+    in ascending order.
+
+    .. versionadded:: 4.2.0
+
+    Parameters
+    ----------
+    value : :class:`~pyspark.sql.Column` or column name
+        A cumulative counter. Must be a numeric data type. Must be 
non-negative.
+    startTime : :class:`~pyspark.sql.Column` or column name, optional
+        An optional timestamp parameter which indicates when the counter was 
last set to zero.
+        Used to signal counter resets.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The difference between the current and previous counter value within 
the window partition.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql import Window
+    >>> from datetime import datetime
+    >>> df = spark.createDataFrame(
+    ...     [('http_requests', datetime(2026, 1, 1, 0, 0), 100),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 1), 200),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 2), 400),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 3), 50),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 4), 100)],
+    ...     "m STRING, t TIMESTAMP_NTZ, c INT")
+    >>> w = Window.partitionBy("m").orderBy("t")
+    >>> df.select("m", "t", "c", 
sf.counter_diff("c").over(w).alias("diff")).show()
+    +-------------+-------------------+---+----+
+    |            m|                  t|  c|diff|
+    +-------------+-------------------+---+----+
+    |http_requests|2026-01-01 00:00:00|100|NULL|
+    |http_requests|2026-01-01 00:01:00|200| 100|
+    |http_requests|2026-01-01 00:02:00|400| 200|
+    |http_requests|2026-01-01 00:03:00| 50|NULL|
+    |http_requests|2026-01-01 00:04:00|100|  50|
+    +-------------+-------------------+---+----+
+
+    >>> df2 = spark.createDataFrame(
+    ...     [('http_requests', datetime(2026, 1, 1, 0, 0), 100, datetime(2026, 
1, 1, 0, 0)),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 1), 200, datetime(2026, 
1, 1, 0, 0)),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 2), 400, datetime(2026, 
1, 1, 0, 0)),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 3), 500, datetime(2026, 
1, 1, 0, 2, 15)),
+    ...      ('http_requests', datetime(2026, 1, 1, 0, 4), 600, datetime(2026, 
1, 1, 0, 2, 15))],
+    ...     "m STRING, t TIMESTAMP_NTZ, c INT, s TIMESTAMP_NTZ")
+    >>> df2.select("m", "t", "s", "c", sf.counter_diff("c", 
"s").over(w).alias("diff")).show()
+    +-------------+-------------------+-------------------+---+----+
+    |            m|                  t|                  s|  c|diff|
+    +-------------+-------------------+-------------------+---+----+
+    |http_requests|2026-01-01 00:00:00|2026-01-01 00:00:00|100|NULL|
+    |http_requests|2026-01-01 00:01:00|2026-01-01 00:00:00|200| 100|
+    |http_requests|2026-01-01 00:02:00|2026-01-01 00:00:00|400| 200|
+    |http_requests|2026-01-01 00:03:00|2026-01-01 00:02:15|500|NULL|
+    |http_requests|2026-01-01 00:04:00|2026-01-01 00:02:15|600| 100|
+    +-------------+-------------------+-------------------+---+----+
+    """
+    if startTime is None:
+        return _invoke_function_over_columns("counter_diff", value)
+    return _invoke_function_over_columns("counter_diff", value, startTime)
+
+
 @_try_remote_functions
 def cume_dist() -> Column:
     """
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index e287ea9326ae..ed653333b6c5 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -766,6 +766,37 @@ class SparkConnectFunctionTests(ReusedMixedTestCase, 
PandasOnSparkTestUtils):
                     sdf.select(scol.over(swin)).toPandas(),
                 )
 
+        # test counter_diff: requires non-negative values and has a 
two-argument form that
+        # accepts a start_time parameter, so a separate dataset is used.
+        counter_diff_query = """
+            SELECT * FROM VALUES
+            (0, TIMESTAMP_NTZ '2026-01-01 00:00:00',  100, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:01:00',  200, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:02:00',  300, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1200, TIMESTAMP_NTZ 
'2026-01-01 00:02:15'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:04:00', 1300, TIMESTAMP_NTZ 
'2026-01-01 00:02:15'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:05:00',   50, TIMESTAMP_NTZ 
'2026-01-01 00:02:15'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:06:00',  100, TIMESTAMP_NTZ 
'2026-01-01 00:02:15'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:07:00',   20, TIMESTAMP_NTZ 
'2026-01-01 00:06:45'),
+            (0, TIMESTAMP_NTZ '2026-01-01 00:08:00',   60, TIMESTAMP_NTZ 
'2026-01-01 00:06:45'),
+            (1, TIMESTAMP_NTZ '2026-01-01 00:00:00',  100, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (1, TIMESTAMP_NTZ '2026-01-01 00:01:00',  200, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (1, TIMESTAMP_NTZ '2026-01-01 00:02:00',  300, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (1, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1000, TIMESTAMP_NTZ 
'2025-12-31 00:00:00'),
+            (1, TIMESTAMP_NTZ '2026-01-01 00:04:00',   75, TIMESTAMP_NTZ 
'2026-01-01 00:03:15')
+            AS tab(p, t, c, s)
+            """
+        cdf_cd = self.connect.sql(counter_diff_query)
+        sdf_cd = self.spark.sql(counter_diff_query)
+        for ccol, scol in [
+            (CF.counter_diff("c"), SF.counter_diff("c")),
+            (CF.counter_diff("c", "s"), SF.counter_diff("c", "s")),
+        ]:
+            self.assert_eq(
+                
cdf_cd.select(ccol.over(CW.partitionBy("p").orderBy("t"))).toPandas(),
+                
sdf_cd.select(scol.over(SW.partitionBy("p").orderBy("t"))).toPandas(),
+            )
+
         # test aggregation functions
         for ccol, scol in [
             (CF.count("c"), SF.count("c")),
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 75824d3ebe49..978476d8e0b2 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -1954,6 +1954,36 @@ class FunctionsTestsMixin:
         for r, ex in zip(rs, expected):
             self.assertEqual(tuple(r), ex[: len(r)])
 
+    def test_counter_diff_window_function(self):
+        df = self.spark.createDataFrame(
+            [
+                (1, datetime.datetime(2026, 1, 1, 0, 0, 0), 100),
+                (2, datetime.datetime(2026, 1, 1, 0, 0, 0), 200),
+                (3, datetime.datetime(2026, 1, 1, 0, 0, 0), 50),
+                (4, datetime.datetime(2026, 1, 1, 0, 0, 0), 100),
+                (5, datetime.datetime(2026, 1, 1, 0, 1, 0), 200),
+                (6, datetime.datetime(2026, 1, 1, 0, 1, 0), 300),
+            ],
+            ["t", "st", "c"],
+        )
+        w = Window.orderBy("t")
+
+        rows = df.select("t", 
F.counter_diff("c").over(w).alias("d")).orderBy("t").collect()
+        self.assertEqual(
+            [(r.t, r.d) for r in rows],
+            [(1, None), (2, 100), (3, None), (4, 50), (5, 100), (6, 100)],
+        )
+
+        rows = (
+            df.select("t", F.counter_diff("c", 
startTime="st").over(w).alias("d"))
+            .orderBy("t")
+            .collect()
+        )
+        self.assertEqual(
+            [(r.t, r.d) for r in rows],
+            [(1, None), (2, 100), (3, None), (4, 50), (5, None), (6, 100)],
+        )
+
     def test_window_functions_without_partitionBy(self):
         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
         w = Window.orderBy("key", df.value)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 4ad731f6d8b0..e07a35ffecd7 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2531,6 +2531,63 @@ object functions {
   // Window functions
   
//////////////////////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * Window function: computes the differences between consecutive cumulative 
counter values in a
+   * time series, thereby converting the counter from the cumulative to the 
delta format.
+   *
+   * Gracefully handles counter resets by returning NULL. Counter resets are 
detected when the
+   * counter value decreases.
+   *
+   * Use the PARTITION BY clause of the window to separate independent 
counters. This is done by
+   * specifying all columns which uniquely identify a time series. These are 
typically the counter
+   * name and any attributes tied to the counter.
+   *
+   * Use the ORDER BY clause of the window to order the observations by the 
associated timestamp
+   * in ascending order.
+   *
+   * @param value
+   *   A cumulative counter. Must be a numeric data type. Must be non-negative.
+   *
+   * @return
+   *   The difference between the current and previous counter value within 
the window partition,
+   *   according to the order defined by the window's ORDER BY clause.
+   *
+   * @group window_funcs
+   * @since 4.2.0
+   */
+  def counter_diff(value: Column): Column = Column.fn("counter_diff", value)
+
+  /**
+   * Window function: computes the differences between consecutive cumulative 
counter values in a
+   * time series, thereby converting the counter from the cumulative to the 
delta format.
+   *
+   * Gracefully handles counter resets by returning NULL. Counter resets are 
detected when the
+   * counter value decreases, or when the start time advances between rows.
+   *
+   * Use the PARTITION BY clause of the window to separate independent 
counters. This is done by
+   * specifying all columns which uniquely identify a time series. These are 
typically the counter
+   * name and any attributes tied to the counter.
+   *
+   * Use the ORDER BY clause of the window to order the observations by the 
associated timestamp
+   * in ascending order.
+   *
+   * @param value
+   *   A cumulative counter. Must be a numeric data type. Must be non-negative.
+   *
+   * @param startTime
+   *   A timestamp indicating when the counter was last set to zero. Used to 
signal counter
+   *   resets.
+   *
+   * @return
+   *   The difference between the current and previous counter value within 
the window partition,
+   *   according to the order defined by the window's ORDER BY clause.
+   *
+   * @group window_funcs
+   * @since 4.2.0
+   */
+  def counter_diff(value: Column, startTime: Column): Column =
+    Column.fn("counter_diff", value, startTime)
+
   /**
    * Window function: returns the cumulative distribution of values within a 
window partition,
    * i.e. the fraction of rows that are below the current row.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 34b9559c5851..ac1fbb9f9bef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -922,6 +922,7 @@ object FunctionRegistry {
     expression[Rank]("rank"),
     expression[DenseRank]("dense_rank"),
     expression[PercentRank]("percent_rank"),
+    expressionBuilder("counter_diff", CounterDiffExpressionBuilder),
 
     // predicates
     expression[Between]("between"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
index ed41c320f8c9..8db3fe94f3b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
@@ -569,7 +569,7 @@ class ResolverGuard(
         true
       // Decimal
       case _: UnscaledValue | _: MakeDecimal | _: CheckOverflow | _: 
CheckOverflowInSum |
-          _: DecimalAddNoOverflowCheck |
+          _: DecimalAddNoOverflowCheck | _: DecimalSubtractNoOverflowCheck |
           _: DecimalDivideWithOverflowCheck =>
         true
       // Interval
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
new file mode 100644
index 000000000000..453145baf02a
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, 
InputParameter}
+import org.apache.spark.sql.errors.QueryErrorsBase
+import org.apache.spark.sql.types._
+
+/**
+ * The counter_diff window function computes the differences between 
consecutive cumulative counter
+ * values in a time series, thereby converting the counter from the cumulative 
to the delta format.
+ *
+ * This class serves as the base class for the two versions of the 
counter_diff function:
+ * - counter_diff(counter) -> CounterDiff(counter)
+ * - counter_diff(counter, start_time) -> CounterDiffWithStartTime(counter, 
startTime)
+ */
+abstract class CounterDiffBase(val counter: Expression)
+  extends AggregateWindowFunction
+    with QueryErrorsBase {
+
+  override def prettyName: String = "counter_diff"
+
+  override def dataType: DataType = counter.dataType
+
+  /**
+   * Last non-NULL counter value from a previous row.
+   */
+  protected lazy val prevCounter: AttributeReference =
+    AttributeReference("prevCounter", counter.dataType, nullable = true)()
+
+  /**
+   * Counter value from the current row.
+   */
+  protected lazy val currCounter: AttributeReference =
+    AttributeReference("currCounter", counter.dataType, nullable = true)()
+
+  /**
+   * Null literal used as a counter_diff result, when appropriate.
+   */
+  protected lazy val nullResult: Expression = Literal.create(null, 
counter.dataType)
+
+  /**
+   * Difference between the current and previous counter values.
+   */
+  protected lazy val diff: Expression = {
+    counter.dataType match {
+      // For DECIMAL, subtraction typically widens the result type to handle 
possible overflow.
+      // For counter_diff, since counters cannot be negative, there is no risk 
of overflow, and no
+      // need to widen the result type, so we subtract directly in the input 
type.
+      case dt: DecimalType => DecimalSubtractNoOverflowCheck(currCounter, 
prevCounter, dt)
+      case _ => currCounter - prevCounter
+    }
+  }
+
+  /**
+   * Returns the difference, unless the counter has decreased, which is 
treated as a counter reset.
+   * In this case, NULL is returned.
+   */
+  protected lazy val diffWithCounterDecreaseCheck: Expression =
+    If(currCounter < prevCounter, nullResult, diff)
+
+  /**
+   * Error raised when the counter is negative.
+   */
+  protected lazy val negativeCounterError: Expression = RaiseError(
+    Literal("COUNTER_DIFF_NEGATIVE_COUNTER_VALUE"),
+    CreateMap(
+      Seq(
+        Literal("value"),
+        Cast(currCounter, StringType),
+        Literal("function"),
+        Literal(toSQLId("counter_diff"))
+      )
+    ),
+    counter.dataType
+  )
+
+  /**
+   * Wraps `inner` with the "skip row on NULL counter" and "raise error on 
negative counter" checks.
+   */
+  protected def withCounterNullAndNegativeChecks(inner: Expression): 
Expression = {
+    If(IsNull(currCounter),
+      nullResult,
+      If(currCounter < Literal.default(counter.dataType),
+        negativeCounterError,
+        inner
+      )
+    )
+  }
+}
+
+/**
+ * The single-parameter form of `counter_diff`: `counter_diff(value)`.
+ * Detects counter resets only when the counter value decreases.
+ */
+case class CounterDiff(override val counter: Expression)
+  extends CounterDiffBase(counter)
+    with ExpectsInputTypes {
+
+  override def children: Seq[Expression] = Seq(counter)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
+
+  /**
+   * The aggregation state attributes for the counter_diff function.
+   * In the single-parameter form, there are two attributes:
+   * - prevCounter: The last non-NULL counter value from a previous row.
+   * - currCounter: The counter value from the current row.
+   */
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    Seq(prevCounter, currCounter)
+
+  /**
+   * The initial aggregation state for the counter_diff function. Initial 
values are NULL.
+   */
+  override lazy val initialValues: Seq[Expression] = Seq(
+    Literal.create(null, counter.dataType),
+    Literal.create(null, counter.dataType)
+  )
+
+  /**
+   * The update expressions for the counter_diff function's aggregation state.
+   *
+   * Fundamentally, the current value becomes the previous value, and the new 
value becomes the
+   * current value.
+   *
+   * Rows with NULL counter values should be skipped. As a result, the 
previous counter value
+   * should not be updated in the aggregation state.
+   */
+  override lazy val updateExpressions: Seq[Expression] = Seq(
+    If(IsNotNull(currCounter), currCounter, prevCounter),
+    counter
+  )
+
+  /**
+   * The evaluation expression for the counter_diff function.
+   *
+   * Checks for edge cases first: NULL counter value, negative counter value 
and counter reset.
+   * Otherwise, returns the difference between the current and previous 
counter values.
+   */
+  override lazy val evaluateExpression: Expression =
+    withCounterNullAndNegativeChecks(diffWithCounterDecreaseCheck)
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): CounterDiff =
+    copy(counter = newChildren.head)
+}
+
+/**
+ * The two-parameter form of `counter_diff`: `counter_diff(value, start_time)`.
+ * Additionally checks for counter resets when `start_time` increases, which 
signals a new start.
+ * Requires that the start time doesn't decrease, which would indicate moving 
backwards in time.
+ */
+case class CounterDiffWithStartTime(
+    override val counter: Expression,
+    startTime: Expression,
+    timeZoneId: Option[String] = None)
+  extends CounterDiffBase(counter)
+    with ExpectsInputTypes
+    with TimeZoneAwareExpression {
+
+  override def withTimeZone(timeZoneId: String): CounterDiffWithStartTime =
+    copy(timeZoneId = Some(timeZoneId))
+
+  override def children: Seq[Expression] = Seq(counter, startTime)
+
+  override def inputTypes: Seq[AbstractDataType] =
+    Seq(NumericType, TypeCollection(TimestampType, TimestampNTZType))
+
+  /**
+   * The start time from a previous row.
+   */
+  protected lazy val prevStartTime: AttributeReference =
+    AttributeReference("prevStartTime", startTime.dataType, nullable = true)()
+
+  /**
+   * The start time from the current row.
+   */
+  protected lazy val currStartTime: AttributeReference =
+    AttributeReference("currStartTime", startTime.dataType, nullable = true)()
+
+  /**
+   * The aggregation state attributes for the counter_diff function.
+   * In the two-parameter form, there are four attributes:
+   * - prevCounter: The last non-NULL counter value from a previous row.
+   * - currCounter: The counter value from the current row.
+   * - prevStartTime: The start time from a previous row.
+   * - currStartTime: The start time from the current row.
+   */
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    Seq(prevCounter, currCounter, prevStartTime, currStartTime)
+
+  /**
+   * The initial aggregation state for the counter_diff function. Initial 
values are NULL.
+   */
+  override lazy val initialValues: Seq[Expression] = Seq(
+    Literal.create(null, counter.dataType),
+    Literal.create(null, counter.dataType),
+    Literal.create(null, startTime.dataType),
+    Literal.create(null, startTime.dataType)
+  )
+
+  /**
+   * The update expressions for the counter_diff function's aggregation state.
+   *
+   * Fundamentally, the current value becomes the previous value, and the new 
value becomes the
+   * current value. The same applies to the start time.
+   *
+   * Rows with NULL counter values should be skipped. As a result, the 
previous values for both
+   * the counter and start time should not be updated in the aggregation state.
+   */
+  override lazy val updateExpressions: Seq[Expression] = Seq(
+    If(IsNotNull(currCounter), currCounter, prevCounter),
+    counter,
+    If(IsNotNull(currCounter), currStartTime, prevStartTime),
+    startTime
+  )
+
+  /**
+   * Error raised when the start time decreases.
+   */
+  protected lazy val decreasedStartTimeError: Expression = RaiseError(
+    Literal("COUNTER_DIFF_START_TIME_DECREASED"),
+    CreateMap(
+      Seq(
+        Literal("function"),
+        Literal(toSQLId("counter_diff")),
+        Literal("previousStartTime"),
+        Cast(prevStartTime, StringType, timeZoneId),
+        Literal("currentStartTime"),
+        Cast(currStartTime, StringType, timeZoneId)
+      )
+    ),
+    counter.dataType
+  )
+
+  /**
+   * The evaluation expression for the counter_diff function.
+   *
+   * Checks for edge cases first: NULL counter value, negative counter value, 
start time decrease
+   * and counter resets.
+   *
+   * Otherwise, returns the difference between the current and previous 
counter values.
+   */
+  override lazy val evaluateExpression: Expression = 
withCounterNullAndNegativeChecks {
+    If(currStartTime < prevStartTime,
+      decreasedStartTimeError,
+      If(prevStartTime < currStartTime,
+        nullResult,
+        diffWithCounterDecreaseCheck
+      )
+    )
+  }
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): CounterDiffWithStartTime =
+    copy(counter = newChildren(0), startTime = newChildren(1))
+}
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+  usage = """
+    _FUNC_(value [, start_time]) - Computes the differences between 
consecutive cumulative counter
+      values in a time series, thereby converting the counter from the 
cumulative to the delta
+      format.
+  """,
+  arguments = """
+    Arguments:
+      * value - A cumulative counter. Must be a numeric data type. Must be 
non-negative.
+      * start_time - An optional timestamp parameter which indicates when the 
counter was last set
+          to zero. Used to signal counter resets.
+  """,
+  examples = """
+    Examples:
+      > SELECT m, t, c, _FUNC_(c) OVER (PARTITION BY m ORDER BY t) AS diff 
FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), 
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200), ('http_requests', 
TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests', TIMESTAMP_NTZ 
'2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ '2026-01-01 
00:04:00', 100) AS tab(m, t, c) ORDER BY t;
+       http_requests   2026-01-01 00:00:00     100     NULL
+       http_requests   2026-01-01 00:01:00     200     100
+       http_requests   2026-01-01 00:02:00     400     200
+       http_requests   2026-01-01 00:03:00     50      NULL
+       http_requests   2026-01-01 00:04:00     100     50
+      > SELECT m, t, s, c, _FUNC_(c, s) OVER (PARTITION BY m ORDER BY t) AS 
diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, 
TIMESTAMP_NTZ '2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ 
'2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2026-01-01 00:00:00'), 
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400, TIMESTAMP_NTZ 
'2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:03:00', 
500, TIMESTAMP_NTZ '2026-01-01 00:02:15'), ('http_ [...]
+       http_requests   2026-01-01 00:00:00     2026-01-01 00:00:00     100     
NULL
+       http_requests   2026-01-01 00:01:00     2026-01-01 00:00:00     200     
100
+       http_requests   2026-01-01 00:02:00     2026-01-01 00:00:00     400     
200
+       http_requests   2026-01-01 00:03:00     2026-01-01 00:02:15     500     
NULL
+       http_requests   2026-01-01 00:04:00     2026-01-01 00:02:15     600     
100
+  """,
+  note = """
+    _FUNC_ calculates the difference between the current and the previous 
counter value within the
+    partition, according to the order defined by the ORDER BY clause.
+
+    Use the PARTITION BY clause of the window to separate independent 
counters. This is done by
+    specifying all columns which uniquely identify a time series. These are 
typically the counter
+    name and any attributes tied to the counter.
+
+    Use the ORDER BY clause of the window to order the observations by the 
associated timestamp
+    in ascending order.
+
+    The following special cases are handled:
+    1. If the counter value is NULL, NULL is returned for that row, and the 
row is excluded from
+       difference calculations and comparisons for subsequent rows.
+    2. If the counter value is negative, or the start time decreases, an error 
is raised.
+    3. In the case of a counter reset, NULL is returned.
+    4. NULL is returned for the first row.
+    5. If either the current or the previous start time is NULL, the start 
time reset check is
+       skipped.
+
+    Counter resets are detected when:
+    1. The current counter value is less than the previous counter value.
+    2. The current start time is greater than the previous start time, if 
start_time was provided.
+  """,
+  group = "window_funcs",
+  since = "4.2.0"
+)
+// scalastyle:on line.size.limit line.contains.tab
+object CounterDiffExpressionBuilder extends ExpressionBuilder {
+  // Placeholder start time which serves as a default value.
+  private val DefaultStartTime: Literal = Literal.create(null, NullType)
+
+  override def functionSignature: Option[FunctionSignature] = {
+    val valueArg = InputParameter("value")
+    val startTimeArg = InputParameter("start_time", Some(DefaultStartTime))
+    Some(FunctionSignature(Seq(valueArg, startTimeArg)))
+  }
+
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    // The function signature defines two parameters, so two expressions are 
always provided.
+    // For the single-parameter form, the start_time argument takes on the 
default value, which is
+    // exactly `DefaultStartTime`, so `eq` is used to check this case. This 
also differentiates
+    // the single-parameter form from the two-parameter form with an explicit 
NULL start time.
+    expressions match {
+      case Seq(value, startTime) if startTime eq DefaultStartTime =>
+        CounterDiff(value)
+      case Seq(value, startTime) =>
+        CounterDiffWithStartTime(value, startTime)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
index 46ab43074409..3e463595ba67 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
@@ -247,6 +247,37 @@ case class DecimalAddNoOverflowCheck(
     copy(left = newLeft, right = newRight)
 }
 
+/**
+ * A subtract expression for decimal values which is only used internally.
+ *
+ * Note that, this expression does not check overflow which is different from 
`Subtract`.
+ * It is the caller's responsibility to ensure that the result fits in the 
declared precision and
+ * scale. For example, `counter_diff` only invokes this on operands that have 
already been validated
+ * to satisfy `left >= right >= 0`, so the result is non-negative and bounded 
above by `left`.
+ */
+case class DecimalSubtractNoOverflowCheck(
+    left: Expression,
+    right: Expression,
+    override val dataType: DataType) extends BinaryOperator {
+  require(dataType.isInstanceOf[DecimalType])
+
+  override def inputType: AbstractDataType = DecimalType
+  override def symbol: String = "-"
+  private def decimalMethod: String = "$minus"
+
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any =
+    numeric.minus(input1, input2)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
+    defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.$decimalMethod($eval2)")
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression, newRight: Expression): 
DecimalSubtractNoOverflowCheck =
+    copy(left = newLeft, right = newRight)
+}
+
 /**
  * A divide expression for decimal values which is only used internally by Avg.
  *
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index c56db6c9b7e3..9ac6e49b0343 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -94,6 +94,7 @@
 | org.apache.spark.sql.catalyst.expressions.Cos | cos | SELECT cos(0) | 
struct<COS(0):double> |
 | org.apache.spark.sql.catalyst.expressions.Cosh | cosh | SELECT cosh(0) | 
struct<COSH(0):double> |
 | org.apache.spark.sql.catalyst.expressions.Cot | cot | SELECT cot(1) | 
struct<COT(1):double> |
+| org.apache.spark.sql.catalyst.expressions.CounterDiffExpressionBuilder | 
counter_diff | SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) 
AS diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 
100), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200), 
('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests', 
TIMESTAMP_NTZ '2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ 
'2026-01-01 00:04:00', 100) AS tab(m, t, c [...]
 | org.apache.spark.sql.catalyst.expressions.Crc32 | crc32 | SELECT 
crc32('Spark') | struct<crc32(Spark):bigint> |
 | org.apache.spark.sql.catalyst.expressions.CreateArray | array | SELECT 
array(1, 2, 3) | struct<array(1, 2, 3):array<int>> |
 | org.apache.spark.sql.catalyst.expressions.CreateMap | map | SELECT map(1.0, 
'2', 3.0, '4') | struct<map(1.0, 2, 3.0, 4):map<decimal(2,1),string>> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out
new file mode 100644
index 000000000000..c8c0764b2d79
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out
@@ -0,0 +1,827 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SET TIME ZONE 'UTC'
+-- !query analysis
+SetCommand (spark.sql.session.timeZone,Some(UTC))
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200),
+  ('b', 1, 10),  ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x]
+   +- Project [m#x, t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [m#x], [t#x ASC NULLS FIRST]
+         +- Project [m#x, t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [m#x, t#x, c#x]
+
+
+-- !query
+SELECT t,
+  counter_diff(d)   OVER (ORDER BY t) AS d_diff,
+  counter_diff(f)   OVER (ORDER BY t) AS f_diff,
+  counter_diff(b)   OVER (ORDER BY t) AS b_diff,
+  counter_diff(l)   OVER (ORDER BY t) AS l_diff,
+  counter_diff(i)   OVER (ORDER BY t) AS i_diff,
+  counter_diff(si)  OVER (ORDER BY t) AS si_diff,
+  counter_diff(ti)  OVER (ORDER BY t) AS ti_diff,
+  counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+  (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+     CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+     CAST(10.5 AS DECIMAL(10,2))),
+  (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+     CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+     CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, 
si_diff#x, ti_diff#x, dec_diff#x]
+   +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x, d_diff#x, 
f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, dec_diff#x, 
d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, 
dec_diff#x]
+      +- Window [counter_diff(d#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
d_diff#x, counter_diff(f#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
f_diff#x, counter_diff(b#xL) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
b_diff#xL, counter_diff(l#xL) windowspecdefinition(t#x  [...]
+         +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, 
dec#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+  (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+   +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS 
diff_subtract#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+  (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+   +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS 
diff_subtract#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+  (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x, diff_subtract#x]
+   +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS 
diff_subtract#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, diff#x, diff#x]
+      +- Window [counter_diff(null) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+        FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias __auto_generated_subquery_name
+               +- Project [t#x, cast(c#x as int) AS c#x]
+                  +- SubqueryAlias tab
+                     +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+  (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+  (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+Sort [t#xL ASC NULLS FIRST], true
++- Project [t#xL, diff#x]
+   +- Project [t#xL, diff#x, diff#x]
+      +- Window [counter_diff(1) windowspecdefinition(t#xL ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#xL ASC NULLS FIRST]
+         +- Project [t#xL]
+            +- SubqueryAlias tab
+               +- Project [id#xL AS t#xL]
+                  +- Range (1, 5, step=1)
+
+
+-- !query
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+Sort [t#xL ASC NULLS FIRST], true
++- Project [t#xL, diff#x]
+   +- Project [t#xL, diff#x, diff#x]
+      +- Window [counter_diff((1 + 1)) windowspecdefinition(t#xL ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#xL ASC NULLS FIRST]
+         +- Project [t#xL]
+            +- SubqueryAlias tab
+               +- Project [id#xL AS t#xL]
+                  +- Range (1, 5, step=1)
+
+
+-- !query
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+  OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, counter_diff(
+    1 + 1,
+    TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+  ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       c - lag(c) OVER (ORDER BY t) AS d1,
+       c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x, d1#x, d2#x]
+   +- Project [t#x, c#x, diff#x, _we1#x, _we2#x, diff#x, (c#x - _we1#x) AS 
d1#x, (c#x - _we2#x) AS d2#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x, lag(c#x, -1, null) 
windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 
-1)) AS _we2#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS 
local_avg,
+       lead(c) OVER (ORDER BY t) AS nc,
+       max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS 
tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x]
+   +- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x, diff#x, 
local_avg#x, nc#x, some_max#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, avg(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x, lead(c#x, 1, null) 
windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) 
AS nc#x, max(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), 1)) AS some_max [...]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT m, t, c,
+       counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+       avg(c) OVER (PARTITION BY m ORDER BY t
+                    ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+       max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+  ('b', 1, 10),  ('b', 2, 30),  ('b', 3, 60),  ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x]
+   +- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x, diff#x, 
local_avg#x, max_c#x]
+      +- Window [max(c#x) windowspecdefinition(t#x, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS max_c#x], [t#x]
+         +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x, avg(c#x) windowspecdefinition(m#x, t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x], [m#x], [t#x ASC NULLS 
FIRST]
+            +- Project [m#x, t#x, c#x]
+               +- SubqueryAlias tab
+                  +- LocalRelation [m#x, t#x, c#x]
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, CAST(NULL AS TIMESTAMP),         200),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, st#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, st#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, c#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+  ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+  ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t
+-- !query analysis
+Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true
++- Project [m#x, t#x, c#x, diff#x]
+   +- Project [m#x, t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(m#x, 
t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [m#x], [t#x ASC NULLS FIRST]
+         +- Project [m#x, t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [m#x, t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+-- !query analysis
+Project [diff#x]
++- Project [c#x, st#x, t#x, diff#x, diff#x]
+   +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+      +- Project [c#x, st#x, t#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, st#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x, st#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, st#x, c#x]
+
+
+-- !query
+WITH gen AS (
+  SELECT
+    TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+    CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+         ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+    CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+  FROM RANGE(8) AS r(id)
+),
+diffs AS (
+  SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+  FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+       SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT counter_diff(1) AS diff
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "funcName" : "\"counter_diff(1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 30,
+    "fragment" : "counter_diff(1) AS diff"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "wf_expr" : "counter_diff(tab.c)",
+    "wf_name" : "counter_diff"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"NUMERIC\"",
+    "sqlExpr" : "\"counter_diff(c)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 40,
+    "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"st\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "second",
+    "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+    "sqlExpr" : "\"counter_diff(c, st)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+  "messageParameters" : {
+    "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())",
+    "wf" : "specifiedwindowframe(RowFrame, -1, 1)"
+  }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (
+  ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t
+-- !query analysis
+Sort [t#x ASC NULLS FIRST], true
++- Project [t#x, diff#x]
+   +- Project [t#x, c#x, diff#x, diff#x]
+      +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
diff#x], [t#x ASC NULLS FIRST]
+         +- Project [t#x, c#x]
+            +- SubqueryAlias tab
+               +- LocalRelation [t#x, c#x]
+
+
+-- !query
+SELECT counter_diff(c) OVER (
+  ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+  "messageParameters" : {
+    "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())",
+    "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())"
+  }
+}
+
+
+-- !query
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND",
+  "sqlState" : "4274K",
+  "messageParameters" : {
+    "index" : "0",
+    "parameterName" : "`value`",
+    "routineName" : "`counter_diff`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 39,
+    "fragment" : "counter_diff() OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "3",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[1, 2]",
+    "functionName" : "`counter_diff`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 68,
+    "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY 
t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c\"",
+    "inputType" : "\"BOOLEAN\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"NUMERIC\"",
+    "sqlExpr" : "\"counter_diff(c)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 40,
+    "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"st\"",
+    "inputType" : "\"DATE\"",
+    "paramIndex" : "second",
+    "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+    "sqlExpr" : "\"counter_diff(c, st)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+  } ]
+}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql 
b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql
new file mode 100644
index 000000000000..34e61f8ee14b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql
@@ -0,0 +1,391 @@
+-- Tests for the counter_diff window function.
+
+SET TIME ZONE 'UTC';
+
+------------------------------------------------------------
+-- Basic semantics
+------------------------------------------------------------
+
+-- Monotonically increasing counter: each diff is current - previous.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t;
+
+-- Single-row input: the only row has no predecessor and returns NULL.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t;
+
+-- Counter reset detected by value decrease: the row after the drop returns 
NULL.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t;
+
+-- Equal counter values produce a diff of 0.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t;
+
+-- NULL counter rows have NULL as the result and are skipped when calculating 
differences.
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t;
+
+-- Each partition has its own first row and prior values.
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200),
+  ('b', 1, 10),  ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t;
+
+------------------------------------------------------------
+-- Numeric types
+------------------------------------------------------------
+
+-- counter_diff supports all numeric types:
+-- DOUBLE, FLOAT, BIGINT, LONG, INT, SMALLINT, TINYINT, DECIMAL.
+SELECT t,
+  counter_diff(d)   OVER (ORDER BY t) AS d_diff,
+  counter_diff(f)   OVER (ORDER BY t) AS f_diff,
+  counter_diff(b)   OVER (ORDER BY t) AS b_diff,
+  counter_diff(l)   OVER (ORDER BY t) AS l_diff,
+  counter_diff(i)   OVER (ORDER BY t) AS i_diff,
+  counter_diff(si)  OVER (ORDER BY t) AS si_diff,
+  counter_diff(ti)  OVER (ORDER BY t) AS ti_diff,
+  counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+  (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+     CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+     CAST(10.5 AS DECIMAL(10,2))),
+  (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+     CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+     CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t;
+
+------------------------------------------------------------
+-- High-precision DECIMAL inputs
+------------------------------------------------------------
+-- Decimal subtraction normally widens the result type to handle possible 
overflow.
+-- For counter_diff, since counters cannot be negative, there is no risk of 
overflow, and no
+-- need to widen the result type, so we subtract directly in the input type.
+-- These tests verify that the result type is not widened, and that no 
precision is lost for
+-- large precision and scale.
+
+-- DECIMAL(38, 38): normal subtraction would be of type DECIMAL(38, 37).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+  (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t;
+
+-- DECIMAL(38, 6): normal subtraction would be of type DECIMAL(38, 6).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+  (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t;
+
+-- DECIMAL(10, 2): normal subtraction would be of type DECIMAL(11, 2).
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+  (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t;
+
+------------------------------------------------------------
+-- NULL type inputs
+------------------------------------------------------------
+
+-- Untyped NULL counters are treated as DOUBLE.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t;
+
+-- Untyped NULL start_time is treated as TIMESTAMP.
+-- The counter behavior is unaffected because NULL start_time skips the reset 
check.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t;
+
+-- Explicitly-typed all-NULL INT counter: type stays INT.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+        FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t;
+
+------------------------------------------------------------
+-- Negative counter values are runtime errors
+------------------------------------------------------------
+
+-- INT.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c);
+
+-- DOUBLE.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c);
+
+-- DECIMAL: the error message preserves the configured scale.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c);
+
+-- -Infinity is treated as a negative value.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c);
+
+-- Negative value after a NULL row still results in an error.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c);
+
+------------------------------------------------------------
+-- Special floating-point values
+------------------------------------------------------------
+
+-- Positive Infinity participates in arithmetic:
+-- +Infinity - 100 = +Infinity.
+-- 200 - +Infinity => Reset.
+-- +Infinity - 200 = +Infinity.
+-- +Infinity - +Infinity = NaN.
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+  (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t;
+
+-- NaN values are greater than all other numeric values, so:
+-- any value -> NaN => NaN
+-- NaN -> any non-NaN value => Reset
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+  (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t;
+
+------------------------------------------------------------
+-- Constants and foldable expressions as arguments
+------------------------------------------------------------
+
+-- Constant counter: every diff except the first is 0.
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Foldable expression counter (1 + 1) behaves like the constant case.
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Constant start_time alongside constant counter.
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+  OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+-- Foldable counter and foldable start_time.
+SELECT t, counter_diff(
+    1 + 1,
+    TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+  ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t;
+
+------------------------------------------------------------
+-- Combined with other window functions
+------------------------------------------------------------
+
+-- Compare counter_diff against lag and lag IGNORE NULLS over the same 
ordering.
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       c - lag(c) OVER (ORDER BY t) AS d1,
+       c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t;
+
+-- Mix counter_diff with avg, lead, max over different frames.
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS 
local_avg,
+       lead(c) OVER (ORDER BY t) AS nc,
+       max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS 
tab(t, c)
+ORDER BY t;
+
+-- Multiple windows with different partitions in the same SELECT.
+SELECT m, t, c,
+       counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+       avg(c) OVER (PARTITION BY m ORDER BY t
+                    ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+       max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+  ('b', 1, 10),  ('b', 2, 30),  ('b', 3, 60),  ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t;
+
+------------------------------------------------------------
+-- start_time parameter
+------------------------------------------------------------
+
+-- start_time advance triggers a reset even when the counter increases.
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Equal start_time across rows: behavior matches the case with no start time.
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- NULL start_time skips the start time reset check on that row and the next 
row.
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, CAST(NULL AS TIMESTAMP),         200),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Decreasing start_time is a runtime error.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c);
+
+-- Negative counter still raises in the start_time form.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c);
+
+-- TIMESTAMP_NTZ start_time is accepted; same reset behavior as TIMESTAMP.
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+-- Partitioned start_time: each partition tracks its own previous start_time.
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+  ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+  ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t;
+
+-- A NULL-counter row is skipped before the start_time check, so a same-row
+-- start_time decrease is absorbed when the counter is NULL.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t;
+
+-- ...but the next non-NULL row still observes the decrease and raises.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c);
+
+-- A NULL-counter row is skipped before the start_time check, so a same-row
+-- start_time increase is absorbed when the counter is NULL, but is resurfaced
+-- on the next non-NULL row.
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t;
+
+------------------------------------------------------------
+-- End-to-end: bucket per-row diffs by hour using a CTE
+------------------------------------------------------------
+
+-- 8 measurements every 30 min; start_time changes at id=4 -> single reset.
+-- The hourly SUM of diffs yields the per-hour total counter increase.
+WITH gen AS (
+  SELECT
+    TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+    CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+         ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+    CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+  FROM RANGE(8) AS r(id)
+),
+diffs AS (
+  SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+  FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+       SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1;
+
+------------------------------------------------------------
+-- Frame, arity, and type validation
+------------------------------------------------------------
+
+-- counter_diff requires an OVER clause (it is a window function).
+SELECT counter_diff(1) AS diff;
+
+-- Window must specify ORDER BY (otherwise the frame is unordered).
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c);
+
+-- Counter argument must be NUMERIC: STRING is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c);
+
+-- start_time argument must be TIMESTAMP or TIMESTAMP_NTZ: STRING is rejected.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c);
+
+-- A user-supplied frame other than ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c);
+
+-- Explicitly specifying the required frame is allowed.
+SELECT t, counter_diff(c) OVER (
+  ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t;
+
+-- RANGE frames are rejected (counter_diff is row-based).
+SELECT counter_diff(c) OVER (
+  ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c);
+
+-- Zero arguments: not accepted.
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t);
+
+-- More than 2 arguments: not accepted.
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t);
+
+-- BOOLEAN counter is rejected.
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c);
+
+-- DATE start_time is rejected.
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c);
diff --git a/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out 
b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out
new file mode 100644
index 000000000000..209882d4cf30
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out
@@ -0,0 +1,864 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SET TIME ZONE 'UTC'
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.session.timeZone     UTC
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+3      400     200
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 50) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+3      400     200
+4      50      NULL
+5      100     50
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      100     0
+3      200     100
+
+
+-- !query
+SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      NULL    NULL
+3      200     100
+
+
+-- !query
+SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200),
+  ('b', 1, 10),  ('b', 2, 30)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int>
+-- !query output
+a      1       100     NULL
+a      2       200     100
+b      1       10      NULL
+b      2       30      20
+
+
+-- !query
+SELECT t,
+  counter_diff(d)   OVER (ORDER BY t) AS d_diff,
+  counter_diff(f)   OVER (ORDER BY t) AS f_diff,
+  counter_diff(b)   OVER (ORDER BY t) AS b_diff,
+  counter_diff(l)   OVER (ORDER BY t) AS l_diff,
+  counter_diff(i)   OVER (ORDER BY t) AS i_diff,
+  counter_diff(si)  OVER (ORDER BY t) AS si_diff,
+  counter_diff(ti)  OVER (ORDER BY t) AS ti_diff,
+  counter_diff(dec) OVER (ORDER BY t) AS dec_diff
+FROM VALUES
+  (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG),
+     CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT),
+     CAST(10.5 AS DECIMAL(10,2))),
+  (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG),
+     CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT),
+     CAST(20.5 AS DECIMAL(10,2)))
+AS tab(t, d, f, b, l, i, si, ti, dec)
+ORDER BY t
+-- !query schema
+struct<t:int,d_diff:double,f_diff:float,b_diff:bigint,l_diff:bigint,i_diff:int,si_diff:smallint,ti_diff:tinyint,dec_diff:decimal(10,2)>
+-- !query output
+1      NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
+2      2.0     2.0     200     200     200     200     20      10.00
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))),
+  (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(38,38),diff_subtract:decimal(38,37)>
+-- !query output
+1      NULL    NULL
+2      0.00000000000000000000000000000000000001        
0.0000000000000000000000000000000000000
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))),
+  (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(38,6),diff_subtract:decimal(38,6)>
+-- !query output
+1      NULL    NULL
+2      0.000001        0.000001
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY 
t) AS diff_subtract
+FROM VALUES
+  (1, CAST(99999999.98 AS DECIMAL(10, 2))),
+  (2, CAST(99999999.99 AS DECIMAL(10, 2)))
+AS tab(t, c) ORDER BY t
+-- !query schema
+struct<t:int,diff:decimal(10,2),diff_subtract:decimal(11,2)>
+-- !query output
+1      NULL    NULL
+2      0.01    0.01
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1      NULL
+2      NULL
+3      NULL
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+2      100
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM (SELECT t, CAST(c AS INT) AS c
+        FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c))
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+2      NULL
+3      NULL
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-5"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, -5.0D) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-5.0"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-5.500"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-Infinity"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-5"
+  }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D),
+  (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1      NULL
+2      Infinity
+3      NULL
+4      Infinity
+5      NaN
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D),
+  (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:double>
+-- !query output
+1      NULL
+2      NaN
+3      NaN
+4      NULL
+5      200.0
+6      NaN
+7      NULL
+8      50.0
+
+
+-- !query
+SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1      NULL
+2      0
+3      0
+4      0
+
+
+-- !query
+SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1      NULL
+2      0
+3      0
+4      0
+
+
+-- !query
+SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00')
+  OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1      NULL
+2      0
+3      0
+4      0
+
+
+-- !query
+SELECT t, counter_diff(
+    1 + 1,
+    TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND
+  ) OVER (ORDER BY t) AS diff
+FROM RANGE(1, 5) AS tab(t)
+ORDER BY t
+-- !query schema
+struct<t:bigint,diff:int>
+-- !query output
+1      NULL
+2      0
+3      0
+4      0
+
+
+-- !query
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       c - lag(c) OVER (ORDER BY t) AS d1,
+       c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2
+FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int,d1:int,d2:int>
+-- !query output
+1      100     NULL    NULL    NULL
+2      NULL    NULL    NULL    NULL
+3      300     200     NULL    200
+4      150     NULL    -150    -150
+
+
+-- !query
+SELECT t, c,
+       counter_diff(c) OVER (ORDER BY t) AS diff,
+       avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS 
local_avg,
+       lead(c) OVER (ORDER BY t) AS nc,
+       max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING) AS some_max
+FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS 
tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int,local_avg:double,nc:int,some_max:int>
+-- !query output
+1      100     NULL    150.0   200     200
+2      200     100     150.0   150     200
+3      150     NULL    250.0   400     400
+4      400     250     350.0   500     500
+5      500     100     500.0   600     600
+6      600     100     550.0   NULL    600
+
+
+-- !query
+SELECT m, t, c,
+       counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff,
+       avg(c) OVER (PARTITION BY m ORDER BY t
+                    ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg,
+       max(c) OVER (PARTITION BY t) AS max_c
+FROM VALUES
+  ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300),
+  ('b', 1, 10),  ('b', 2, 30),  ('b', 3, 60),  ('b', 4, 100)
+AS tab(m, t, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int,local_avg:double,max_c:int>
+-- !query output
+a      1       100     NULL    150.0   100
+a      2       200     100     150.0   200
+a      3       150     NULL    216.66666666666666      150
+a      4       300     150     225.0   300
+b      1       10      NULL    20.0    100
+b      2       30      20      33.333333333333336      200
+b      3       60      30      63.333333333333336      150
+b      4       100     40      80.0    300
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP '2026-01-01 00:02:31', 400)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+3      400     NULL
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:00:00', 200)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  (2, CAST(NULL AS TIMESTAMP),         200),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+3      300     100
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', 200)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED",
+  "sqlState" : "22023",
+  "messageParameters" : {
+    "currentStartTime" : "2026-01-01 00:01:00",
+    "function" : "`counter_diff`",
+    "previousStartTime" : "2026-01-01 00:05:00"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:00:00', -1)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE",
+  "sqlState" : "22003",
+  "messageParameters" : {
+    "function" : "`counter_diff`",
+    "value" : "-1"
+  }
+}
+
+
+-- !query
+SELECT t, c,
+  counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100),
+  (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200),
+  (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,c:int,diff:int>
+-- !query output
+1      100     NULL
+2      200     100
+3      300     NULL
+
+
+-- !query
+SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff
+FROM VALUES
+  ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100),
+  ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200),
+  ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500),
+  ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10),
+  ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20)
+AS tab(m, t, st, c)
+ORDER BY m, t
+-- !query schema
+struct<m:string,t:int,c:int,diff:int>
+-- !query output
+a      1       100     NULL
+a      2       200     100
+a      3       500     NULL
+b      1       10      NULL
+b      2       20      10
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT))
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:01:00', 300)
+AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED",
+  "sqlState" : "22023",
+  "messageParameters" : {
+    "currentStartTime" : "2026-01-01 00:01:00",
+    "function" : "`counter_diff`",
+    "previousStartTime" : "2026-01-01 00:05:00"
+  }
+}
+
+
+-- !query
+SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES
+  (1, TIMESTAMP '2026-01-01 00:05:00', 100),
+  (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)),
+  (3, TIMESTAMP '2026-01-01 00:07:00', 300)
+AS tab(t, st, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+2      NULL
+3      NULL
+
+
+-- !query
+WITH gen AS (
+  SELECT
+    TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts,
+    CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00'
+         ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st,
+    CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c
+  FROM RANGE(8) AS r(id)
+),
+diffs AS (
+  SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff
+  FROM gen
+)
+SELECT date_trunc('hour', ts) AS hour_bucket,
+       SUM(diff) AS total_diff
+FROM diffs GROUP BY 1 ORDER BY 1
+-- !query schema
+struct<hour_bucket:timestamp,total_diff:bigint>
+-- !query output
+2026-01-01 00:00:00    1000
+2026-01-01 01:00:00    2000
+2026-01-01 02:00:00    800
+2026-01-01 03:00:00    1600
+
+
+-- !query
+SELECT counter_diff(1) AS diff
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "funcName" : "\"counter_diff(1)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 30,
+    "fragment" : "counter_diff(1) AS diff"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER () AS diff
+FROM VALUES (1) AS tab(c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "wf_expr" : "counter_diff(tab.c)",
+    "wf_name" : "counter_diff"
+  }
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc') AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"NUMERIC\"",
+    "sqlExpr" : "\"counter_diff(c)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 40,
+    "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, 'abc', 100) AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"st\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "second",
+    "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+    "sqlExpr" : "\"counter_diff(c, st)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+  "messageParameters" : {
+    "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())",
+    "wf" : "specifiedwindowframe(RowFrame, -1, 1)"
+  }
+}
+
+
+-- !query
+SELECT t, counter_diff(c) OVER (
+  ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+ORDER BY t
+-- !query schema
+struct<t:int,diff:int>
+-- !query output
+1      NULL
+2      100
+
+
+-- !query
+SELECT counter_diff(c) OVER (
+  ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff
+FROM VALUES (1, 100), (2, 200) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1036",
+  "messageParameters" : {
+    "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())",
+    "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())"
+  }
+}
+
+
+-- !query
+SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND",
+  "sqlState" : "4274K",
+  "messageParameters" : {
+    "index" : "0",
+    "parameterName" : "`value`",
+    "routineName" : "`counter_diff`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 39,
+    "fragment" : "counter_diff() OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff
+FROM VALUES (1) AS tab(t)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "3",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[1, 2]",
+    "functionName" : "`counter_diff`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 68,
+    "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY 
t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c) OVER (ORDER BY t) AS diff
+FROM VALUES (1, true), (2, false) AS tab(t, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c\"",
+    "inputType" : "\"BOOLEAN\"",
+    "paramIndex" : "first",
+    "requiredType" : "\"NUMERIC\"",
+    "sqlExpr" : "\"counter_diff(c)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 40,
+    "fragment" : "counter_diff(c) OVER (ORDER BY t)"
+  } ]
+}
+
+
+-- !query
+SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff
+FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"st\"",
+    "inputType" : "\"DATE\"",
+    "paramIndex" : "second",
+    "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")",
+    "sqlExpr" : "\"counter_diff(c, st)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "counter_diff(c, st) OVER (ORDER BY t)"
+  } ]
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index c46cde0d0db1..ffb018b264bd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -835,6 +835,28 @@ class DataFrameWindowFunctionsSuite extends 
SharedSparkSession
           "v", "z", null, "v", "z", "y", null, "va")))
   }
 
+  test("counter_diff with and without startTime") {
+    import java.sql.Timestamp
+    val df = Seq(
+      (1, Timestamp.valueOf("2024-01-01 00:00:00"), 100),
+      (2, Timestamp.valueOf("2024-01-01 12:00:00"), 200),
+      (3, Timestamp.valueOf("2024-01-01 12:00:00"), 400),
+      (4, Timestamp.valueOf("2024-01-02 00:00:00"), 50),
+      (5, Timestamp.valueOf("2024-01-02 00:00:00"), 150)
+    ).toDF("t", "st", "c")
+    val w = Window.orderBy($"t")
+
+    // 1-arg form: reset detected by counter decrease only.
+    checkAnswer(
+      df.select($"t", counter_diff($"c").over(w)).orderBy($"t"),
+      Seq(Row(1, null), Row(2, 100), Row(3, 200), Row(4, null), Row(5, 100)))
+
+    // 2-arg form: reset also detected by startTime advance.
+    checkAnswer(
+      df.select($"t", counter_diff($"c", $"st").over(w)).orderBy($"t"),
+      Seq(Row(1, null), Row(2, null), Row(3, 200), Row(4, null), Row(5, 100)))
+  }
+
   test("lag - Offset expression <offset> must be a literal") {
     val nullStr: String = null
     val df = Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to