This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7198633fbe9b [SPARK-55146][SS] State Repartition API for PySpark
7198633fbe9b is described below

commit 7198633fbe9b1bca02150563fc5c6e1ad8deefb8
Author: micheal-o <[email protected]>
AuthorDate: Mon Jan 26 10:28:18 2026 -0800

    [SPARK-55146][SS] State Repartition API for PySpark
    
    ### What changes were proposed in this pull request?
    
    Introduce the offline state repartition API and checkpoint manager in 
Pyspark.
    
    ### Why are the changes needed?
    
    For offline state repartitioning
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New test suite added
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude-4.5-opus
    
    Closes #53931 from micheal-o/micheal-okutubo_data/repart_py_api.
    
    Authored-by: micheal-o <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   1 +
 python/pyspark/sql/context.py                      |  13 +
 python/pyspark/sql/session.py                      |  19 ++
 python/pyspark/sql/streaming/__init__.py           |   1 +
 python/pyspark/sql/streaming/query.py              |  56 ++++
 .../test_streaming_offline_state_repartition.py    | 327 +++++++++++++++++++++
 .../sql/streaming/StreamingCheckpointManager.scala |   4 +
 .../apache/spark/sql/classic/SparkSession.scala    |   4 +-
 .../sql/classic/StreamingCheckpointManager.scala   |   2 +
 9 files changed, 426 insertions(+), 1 deletion(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index fb8e4697fcb3..f14d94251365 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -669,6 +669,7 @@ pyspark_structured_streaming = Module(
         "pyspark.sql.tests.streaming.test_streaming_foreach",
         "pyspark.sql.tests.streaming.test_streaming_foreach_batch",
         "pyspark.sql.tests.streaming.test_streaming_listener",
+        "pyspark.sql.tests.streaming.test_streaming_offline_state_repartition",
         "pyspark.sql.tests.streaming.test_streaming_reader_name",
         "pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
         "pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 3fe47615b876..c51b9e063a77 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -43,6 +43,7 @@ from pyspark.sql.udtf import UDTFRegistration
 from pyspark.errors.exceptions.captured import install_exception_handler
 from pyspark.sql.types import AtomicType, DataType, StructType
 from pyspark.sql.streaming import StreamingQueryManager
+from pyspark.sql.streaming.query import StreamingCheckpointManager
 
 if TYPE_CHECKING:
     from py4j.java_gateway import JavaObject
@@ -699,6 +700,18 @@ class SQLContext:
 
         return StreamingQueryManager(self._ssql_ctx.streams())
 
+    @property
+    def _streamingCheckpointManager(self) -> StreamingCheckpointManager:
+        """Returns a :class:`StreamingCheckpointManager` to manage streaming 
checkpoints.
+
+        .. versionadded:: 4.2.0
+
+        Notes
+        -----
+        This API is evolving.
+        """
+        return 
StreamingCheckpointManager(self._ssql_ctx.streamingCheckpointManager())
+
 
 class HiveContext(SQLContext):
     """A variant of Spark SQL that integrates with data stored in Hive.
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0fd84c2f4886..cba5b02a9ccf 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -80,6 +80,7 @@ if TYPE_CHECKING:
     from pyspark.sql.catalog import Catalog
     from pyspark.sql.pandas._typing import ArrayLike, DataFrameLike as 
PandasDataFrameLike
     from pyspark.sql.streaming import StreamingQueryManager
+    from pyspark.sql.streaming.query import StreamingCheckpointManager
     from pyspark.sql.tvf import TableValuedFunction
     from pyspark.sql.udf import UDFRegistration
     from pyspark.sql.udtf import UDTFRegistration
@@ -2015,6 +2016,24 @@ class SparkSession(SparkConversionMixin):
 
         return StreamingQueryManager(self._jsparkSession.streams())
 
+    @cached_property
+    def _streamingCheckpointManager(self) -> "StreamingCheckpointManager":
+        """Returns a :class:`StreamingCheckpointManager` to manage streaming 
checkpoints.
+
+        .. versionadded:: 4.2.0
+
+        Notes
+        -----
+        This API is evolving.
+
+        Returns
+        -------
+        :class:`StreamingCheckpointManager`
+        """
+        from pyspark.sql.streaming.query import StreamingCheckpointManager
+
+        return 
StreamingCheckpointManager(self._jsparkSession.streamingCheckpointManager())
+
     @property
     def tvf(self) -> "TableValuedFunction":
         """
diff --git a/python/pyspark/sql/streaming/__init__.py 
b/python/pyspark/sql/streaming/__init__.py
index e3c6ca519ad0..712f253fbb26 100644
--- a/python/pyspark/sql/streaming/__init__.py
+++ b/python/pyspark/sql/streaming/__init__.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+# TODO: Add StreamingCheckpointManager to this when we want to make it public
 from pyspark.sql.streaming.query import StreamingQuery, StreamingQueryManager  
# noqa: F401
 from pyspark.sql.streaming.readwriter import DataStreamReader, 
DataStreamWriter  # noqa: F401
 from pyspark.sql.streaming.listener import StreamingQueryListener  # noqa: F401
diff --git a/python/pyspark/sql/streaming/query.py 
b/python/pyspark/sql/streaming/query.py
index 45ca818d7ae2..36462b6114ec 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -30,6 +30,8 @@ from pyspark.sql.streaming.listener import (
 if TYPE_CHECKING:
     from py4j.java_gateway import JavaObject
 
+# TODO: Add StreamingCheckpointManager to __all__ once we add streaming 
checkpoint manager to the
+# public API
 __all__ = ["StreamingQuery", "StreamingQueryManager"]
 
 
@@ -721,6 +723,60 @@ class StreamingQueryManager:
         self._jsqm.removeListener(listener._jlistener)
 
 
+class StreamingCheckpointManager:
+    """
+    A class to manage operations on streaming query checkpoints.
+
+    .. versionadded:: 4.2.0
+
+    Notes
+    -----
+    This API is evolving and currently supported in Spark Classic.
+    """
+
+    def __init__(self, jmanager: "JavaObject") -> None:
+        self._jmanager = jmanager
+
+    def repartition(
+        self, checkpoint_location: str, num_partitions: int, 
enforce_exactly_once_sink: bool = True
+    ) -> None:
+        """
+        Repartition the stateful streaming operators state in the streaming 
checkpoint to have
+        `num_partitions` partitions. The streaming query MUST not be running. 
If `num_partitions` is
+        the same as the current number of partitions, this is a no-op, and an 
exception will be
+        thrown.
+
+        This produces a new microbatch in the checkpoint that contains the 
repartitioned state i.e.
+        if the last streaming batch was batch `N`, this will create batch 
`N+1` with the
+        repartitioned state. Note that this new batch doesn't read input data 
from sources, it only
+        represents the repartition operation. The next time the streaming 
query is started, it will
+        pick up from this new batch.
+
+        This will return only when the repartitioning is complete or fails.
+
+        .. versionadded:: 4.2.0
+
+        Parameters
+        ----------
+        checkpoint_location : str
+            The checkpoint location of the streaming query, should be the 
`checkpointLocation` option
+            on the DataStreamWriter.
+        num_partitions : int
+            The target number of state partitions.
+        enforce_exactly_once_sink : bool, optional
+            If we shouldn't allow skipping failed batches, to avoid duplicates 
in exactly once sinks.
+            default ``True``.
+
+        Notes
+        -----
+        This API is experimental.
+
+        This operation should only be performed after the streaming query has 
been stopped. If not,
+        can lead to undefined behavior or checkpoint corruption.
+        """
+        self._jmanager.repartition(checkpoint_location, num_partitions, 
enforce_exactly_once_sink)
+
+
 def _test() -> None:
     import doctest
     import os
diff --git 
a/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
 
b/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
new file mode 100644
index 000000000000..89f9024bd538
--- /dev/null
+++ 
b/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import tempfile
+import unittest
+
+from pyspark import SparkConf
+from pyspark.sql.streaming.state import GroupStateTimeout
+from pyspark.sql.types import LongType, StringType, StructType, StructField
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+if have_pandas:
+    import pandas as pd
+
+if have_pyarrow:
+    import pyarrow as pa  # noqa: F401
+
+
+class OfflineStateRepartitionTestUtils:
+    """Utility class for repartition tests."""
+
+    @staticmethod
+    def run_repartition_test(
+        spark,
+        num_shuffle_partitions,
+        create_streaming_df,
+        output_mode,
+        batch1_data,
+        batch2_data,
+        batch3_data,
+        verify_initial,
+        verify_after_increase,
+        verify_after_decrease,
+    ):
+        """
+        Common helper to run repartition tests with different streaming 
operators.
+
+        Steps:
+        1. Run streaming query to generate initial state
+        2. Repartition to more partitions
+        3. Add new data and restart query, verify state preserved
+        4. Repartition to fewer partitions
+        5. Add new data and restart query, verify state preserved
+
+        Parameters
+        ----------
+        spark : SparkSession
+            The active Spark session.
+        num_shuffle_partitions : int
+            The initial number of shuffle partitions.
+        create_streaming_df : callable
+            Function(df) -> DataFrame that applies the streaming 
transformation.
+        output_mode : str
+            Output mode for the streaming query ("update" or "append").
+        batch1_data, batch2_data, batch3_data : str
+            Data to write for each batch (newline-separated values).
+        verify_initial, verify_after_increase, verify_after_decrease : callable
+            Functions(collected_results) to verify results at each stage.
+        """
+        with tempfile.TemporaryDirectory() as input_dir, 
tempfile.TemporaryDirectory() as checkpoint_dir:
+            collected_results = []
+
+            def collect_batch(batch_df, batch_id):
+                rows = batch_df.collect()
+                collected_results.extend(rows)
+
+            def run_streaming_query():
+                df = spark.readStream.format("text").load(input_dir)
+                transformed_df = create_streaming_df(df)
+                query = (
+                    transformed_df.writeStream.foreachBatch(collect_batch)
+                    .option("checkpointLocation", checkpoint_dir)
+                    .outputMode(output_mode)
+                    .start()
+                )
+                query.processAllAvailable()
+                query.stop()
+
+            # Step 1: Write initial data and run streaming query
+            with open(os.path.join(input_dir, "batch1.txt"), "w") as f:
+                f.write(batch1_data)
+
+            run_streaming_query()
+            verify_initial(collected_results)
+
+            # Step 2: Repartition to more partitions
+            spark._streamingCheckpointManager.repartition(
+                checkpoint_dir, num_shuffle_partitions * 2
+            )
+
+            # Step 3: Add more data and restart query
+            with open(os.path.join(input_dir, "batch2.txt"), "w") as f:
+                f.write(batch2_data)
+
+            collected_results.clear()
+            run_streaming_query()
+            verify_after_increase(collected_results)
+
+            # Step 4: Repartition to fewer partitions
+            spark._streamingCheckpointManager.repartition(
+                checkpoint_dir, num_shuffle_partitions - 1
+            )
+
+            # Step 5: Add more data and restart query
+            with open(os.path.join(input_dir, "batch3.txt"), "w") as f:
+                f.write(batch3_data)
+
+            collected_results.clear()
+            run_streaming_query()
+            verify_after_decrease(collected_results)
+
+
+class StreamingOfflineStateRepartitionTests(ReusedSQLTestCase):
+    """
+    Test suite for Offline state repartitioning.
+    """
+
+    NUM_SHUFFLE_PARTITIONS = 3
+
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", 
str(cls.NUM_SHUFFLE_PARTITIONS))
+        cfg.set(
+            "spark.sql.streaming.stateStore.providerClass",
+            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+        )
+        return cfg
+
+    def test_fail_if_empty_checkpoint_directory(self):
+        """Test that repartition fails if checkpoint directory is empty."""
+        with tempfile.TemporaryDirectory() as checkpoint_dir:
+            with self.assertRaisesRegex(
+                Exception, 
"STATE_REPARTITION_INVALID_CHECKPOINT.NO_COMMITTED_BATCH"
+            ):
+                
self.spark._streamingCheckpointManager.repartition(checkpoint_dir, 5)
+
+    def test_fail_if_no_batch_found_in_checkpoint_directory(self):
+        """Test that repartition fails if no batch found in checkpoint 
directory."""
+        with tempfile.TemporaryDirectory() as checkpoint_dir:
+            # Write commit log but no offset log
+            commits_dir = os.path.join(checkpoint_dir, "commits")
+            os.makedirs(commits_dir)
+            # Create a minimal commit file for batch 0
+            with open(os.path.join(commits_dir, "0"), "w") as f:
+                f.write("v1\n{}")
+
+            with self.assertRaisesRegex(
+                Exception, 
"STATE_REPARTITION_INVALID_CHECKPOINT.NO_BATCH_FOUND"
+            ):
+                
self.spark._streamingCheckpointManager.repartition(checkpoint_dir, 5)
+
+    def test_fail_if_repartition_parameter_is_invalid(self):
+        """Test that repartition fails with invalid parameters."""
+        # Test null checkpoint location
+        with self.assertRaisesRegex(Exception, 
"STATE_REPARTITION_INVALID_PARAMETER.IS_NULL"):
+            self.spark._streamingCheckpointManager.repartition(None, 5)
+
+        # Test empty checkpoint location
+        with self.assertRaisesRegex(Exception, 
"STATE_REPARTITION_INVALID_PARAMETER.IS_EMPTY"):
+            self.spark._streamingCheckpointManager.repartition("", 5)
+
+        # Test numPartitions <= 0
+        with self.assertRaisesRegex(
+            Exception, 
"STATE_REPARTITION_INVALID_PARAMETER.IS_NOT_GREATER_THAN_ZERO"
+        ):
+            self.spark._streamingCheckpointManager.repartition("test", 0)
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        pandas_requirement_message or pyarrow_requirement_message,
+    )
+    def test_repartition_with_apply_in_pandas_with_state(self):
+        """Test repartition for a streaming query using 
applyInPandasWithState."""
+
+        # Define the stateful function that tracks count per key
+        def stateful_count_func(key, pdf_iter, state):
+            existing_count = state.getOption
+            count = 0 if existing_count is None else existing_count[0]
+
+            new_count = 0
+            for pdf in pdf_iter:
+                new_count += len(pdf)
+
+            total_count = count + new_count
+            state.update((total_count,))
+            yield pd.DataFrame({"key": [key[0]], "count": [total_count]})
+
+        output_schema = StructType(
+            [StructField("key", StringType()), StructField("count", 
LongType())]
+        )
+        state_schema = StructType([StructField("count", LongType())])
+
+        def create_streaming_df(df):
+            return df.groupBy(df["value"]).applyInPandasWithState(
+                stateful_count_func,
+                output_schema,
+                state_schema,
+                "Update",
+                GroupStateTimeout.NoTimeout,
+            )
+
+        def verify_initial(results):
+            counts = {row.key: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 2)
+            self.assertEqual(counts.get("b"), 1)
+            self.assertEqual(counts.get("c"), 1)
+
+        def verify_after_increase(results):
+            counts = {row.key: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 3, "State for 'a': 2 + 1 = 3")
+            self.assertEqual(counts.get("b"), 2, "State for 'b': 1 + 1 = 2")
+            self.assertEqual(counts.get("d"), 1, "New key 'd' should have 
count 1")
+
+        def verify_after_decrease(results):
+            counts = {row.key: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 4, "State for 'a': 3 + 1 = 4")
+            self.assertEqual(counts.get("c"), 2, "State for 'c': 1 + 1 = 2")
+            self.assertEqual(counts.get("e"), 1, "New key 'e' should have 
count 1")
+
+        OfflineStateRepartitionTestUtils.run_repartition_test(
+            spark=self.spark,
+            num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+            create_streaming_df=create_streaming_df,
+            output_mode="update",
+            batch1_data="a\nb\na\nc\n",  # a:2, b:1, c:1
+            batch2_data="a\nb\nd\n",  # a:+1, b:+1, d:1 (new)
+            batch3_data="a\nc\ne\n",  # a:+1, c:+1, e:1 (new)
+            verify_initial=verify_initial,
+            verify_after_increase=verify_after_increase,
+            verify_after_decrease=verify_after_decrease,
+        )
+
+    def test_repartition_with_streaming_aggregation(self):
+        """Test repartition for a streaming aggregation query (groupBy + 
count)."""
+
+        def create_streaming_df(df):
+            return df.groupBy("value").count()
+
+        def verify_initial(results):
+            counts = {row.value: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 2)
+            self.assertEqual(counts.get("b"), 1)
+            self.assertEqual(counts.get("c"), 1)
+
+        def verify_after_increase(results):
+            counts = {row.value: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 3, "State for 'a': 2 + 1 = 3")
+            self.assertEqual(counts.get("b"), 2, "State for 'b': 1 + 1 = 2")
+            self.assertEqual(counts.get("d"), 1, "New key 'd' should have 
count 1")
+
+        def verify_after_decrease(results):
+            counts = {row.value: row["count"] for row in results}
+            self.assertEqual(counts.get("a"), 4, "State for 'a': 3 + 1 = 4")
+            self.assertEqual(counts.get("c"), 2, "State for 'c': 1 + 1 = 2")
+            self.assertEqual(counts.get("e"), 1, "New key 'e' should have 
count 1")
+
+        OfflineStateRepartitionTestUtils.run_repartition_test(
+            spark=self.spark,
+            num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+            create_streaming_df=create_streaming_df,
+            output_mode="update",
+            batch1_data="a\nb\na\nc\n",  # a:2, b:1, c:1
+            batch2_data="a\nb\nd\n",  # a:+1, b:+1, d:1 (new)
+            batch3_data="a\nc\ne\n",  # a:+1, c:+1, e:1 (new)
+            verify_initial=verify_initial,
+            verify_after_increase=verify_after_increase,
+            verify_after_decrease=verify_after_decrease,
+        )
+
+    def test_repartition_with_streaming_dedup(self):
+        """Test repartition for a streaming deduplication query 
(dropDuplicates)."""
+
+        def create_streaming_df(df):
+            return df.dropDuplicates(["value"])
+
+        def verify_initial(results):
+            values = {row.value for row in results}
+            self.assertEqual(values, {"a", "b", "c"})
+
+        def verify_after_increase(results):
+            values = {row.value for row in results}
+            self.assertEqual(values, {"d", "e"}, "Only new keys after 
repartition")
+
+        def verify_after_decrease(results):
+            values = {row.value for row in results}
+            self.assertEqual(values, {"f", "g"}, "Only new keys after 
repartition")
+
+        OfflineStateRepartitionTestUtils.run_repartition_test(
+            spark=self.spark,
+            num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+            create_streaming_df=create_streaming_df,
+            output_mode="append",
+            batch1_data="a\nb\na\nc\n",  # unique: a, b, c
+            batch2_data="a\nb\nd\ne\n",  # a, b duplicates; d, e new
+            batch3_data="a\nc\nf\ng\n",  # a, c duplicates; f, g new
+            verify_initial=verify_initial,
+            verify_after_increase=verify_after_increase,
+            verify_after_decrease=verify_after_decrease,
+        )
+
+
+if __name__ == "__main__":
+    from pyspark.testing import main
+
+    main()
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
index 7bb6fda4818c..b9d9515303c8 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.streaming
 
+import org.apache.spark.annotation.Evolving
+
 /**
  * A class to manage operations on streaming query checkpoints.
  */
+@Evolving
 private[spark] abstract class StreamingCheckpointManager {
 
   /**
@@ -47,6 +50,7 @@ private[spark] abstract class StreamingCheckpointManager {
    * @param enforceExactlyOnceSink
    *   if we shouldn't allow skipping failed batches, to avoid duplicates in 
exactly once sinks.
    */
+  @Evolving
   private[spark] def repartition(
       checkpointLocation: String,
       numPartitions: Int,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
index c4620f13afc6..d1c7c406544c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
@@ -246,7 +246,9 @@ class SparkSession private(
   @Unstable
   def streams: StreamingQueryManager = sessionState.streamingQueryManager
 
-  private[spark] def streamingCheckpointManager = 
sessionState.streamingCheckpointManager
+  @Unstable
+  private[spark] def streamingCheckpointManager: StreamingCheckpointManager =
+    sessionState.streamingCheckpointManager
 
   /**
    * Returns an `ArtifactManager` that supports adding, managing and using 
session-scoped artifacts
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
index b7c4ff362b22..5b6455b8741f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.classic
 
+import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionErrors, 
OfflineStateRepartitionRunner}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming
 
 /** @inheritdoc */
+@Evolving
 private[spark] class StreamingCheckpointManager(
     sparkSession: SparkSession,
     sqlConf: SQLConf) extends streaming.StreamingCheckpointManager with 
Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to