This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7198633fbe9b [SPARK-55146][SS] State Repartition API for PySpark
7198633fbe9b is described below
commit 7198633fbe9b1bca02150563fc5c6e1ad8deefb8
Author: micheal-o <[email protected]>
AuthorDate: Mon Jan 26 10:28:18 2026 -0800
[SPARK-55146][SS] State Repartition API for PySpark
### What changes were proposed in this pull request?
Introduce the offline state repartition API and checkpoint manager in
Pyspark.
### Why are the changes needed?
For offline state repartitioning
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New test suite added
### Was this patch authored or co-authored using generative AI tooling?
Claude-4.5-opus
Closes #53931 from micheal-o/micheal-okutubo_data/repart_py_api.
Authored-by: micheal-o <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
dev/sparktestsupport/modules.py | 1 +
python/pyspark/sql/context.py | 13 +
python/pyspark/sql/session.py | 19 ++
python/pyspark/sql/streaming/__init__.py | 1 +
python/pyspark/sql/streaming/query.py | 56 ++++
.../test_streaming_offline_state_repartition.py | 327 +++++++++++++++++++++
.../sql/streaming/StreamingCheckpointManager.scala | 4 +
.../apache/spark/sql/classic/SparkSession.scala | 4 +-
.../sql/classic/StreamingCheckpointManager.scala | 2 +
9 files changed, 426 insertions(+), 1 deletion(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index fb8e4697fcb3..f14d94251365 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -669,6 +669,7 @@ pyspark_structured_streaming = Module(
"pyspark.sql.tests.streaming.test_streaming_foreach",
"pyspark.sql.tests.streaming.test_streaming_foreach_batch",
"pyspark.sql.tests.streaming.test_streaming_listener",
+ "pyspark.sql.tests.streaming.test_streaming_offline_state_repartition",
"pyspark.sql.tests.streaming.test_streaming_reader_name",
"pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 3fe47615b876..c51b9e063a77 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -43,6 +43,7 @@ from pyspark.sql.udtf import UDTFRegistration
from pyspark.errors.exceptions.captured import install_exception_handler
from pyspark.sql.types import AtomicType, DataType, StructType
from pyspark.sql.streaming import StreamingQueryManager
+from pyspark.sql.streaming.query import StreamingCheckpointManager
if TYPE_CHECKING:
from py4j.java_gateway import JavaObject
@@ -699,6 +700,18 @@ class SQLContext:
return StreamingQueryManager(self._ssql_ctx.streams())
+ @property
+ def _streamingCheckpointManager(self) -> StreamingCheckpointManager:
+ """Returns a :class:`StreamingCheckpointManager` to manage streaming
checkpoints.
+
+ .. versionadded:: 4.2.0
+
+ Notes
+ -----
+ This API is evolving.
+ """
+ return
StreamingCheckpointManager(self._ssql_ctx.streamingCheckpointManager())
+
class HiveContext(SQLContext):
"""A variant of Spark SQL that integrates with data stored in Hive.
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0fd84c2f4886..cba5b02a9ccf 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -80,6 +80,7 @@ if TYPE_CHECKING:
from pyspark.sql.catalog import Catalog
from pyspark.sql.pandas._typing import ArrayLike, DataFrameLike as
PandasDataFrameLike
from pyspark.sql.streaming import StreamingQueryManager
+ from pyspark.sql.streaming.query import StreamingCheckpointManager
from pyspark.sql.tvf import TableValuedFunction
from pyspark.sql.udf import UDFRegistration
from pyspark.sql.udtf import UDTFRegistration
@@ -2015,6 +2016,24 @@ class SparkSession(SparkConversionMixin):
return StreamingQueryManager(self._jsparkSession.streams())
+ @cached_property
+ def _streamingCheckpointManager(self) -> "StreamingCheckpointManager":
+ """Returns a :class:`StreamingCheckpointManager` to manage streaming
checkpoints.
+
+ .. versionadded:: 4.2.0
+
+ Notes
+ -----
+ This API is evolving.
+
+ Returns
+ -------
+ :class:`StreamingCheckpointManager`
+ """
+ from pyspark.sql.streaming.query import StreamingCheckpointManager
+
+ return
StreamingCheckpointManager(self._jsparkSession.streamingCheckpointManager())
+
@property
def tvf(self) -> "TableValuedFunction":
"""
diff --git a/python/pyspark/sql/streaming/__init__.py
b/python/pyspark/sql/streaming/__init__.py
index e3c6ca519ad0..712f253fbb26 100644
--- a/python/pyspark/sql/streaming/__init__.py
+++ b/python/pyspark/sql/streaming/__init__.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+# TODO: Add StreamingCheckpointManager to this when we want to make it public
from pyspark.sql.streaming.query import StreamingQuery, StreamingQueryManager
# noqa: F401
from pyspark.sql.streaming.readwriter import DataStreamReader,
DataStreamWriter # noqa: F401
from pyspark.sql.streaming.listener import StreamingQueryListener # noqa: F401
diff --git a/python/pyspark/sql/streaming/query.py
b/python/pyspark/sql/streaming/query.py
index 45ca818d7ae2..36462b6114ec 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -30,6 +30,8 @@ from pyspark.sql.streaming.listener import (
if TYPE_CHECKING:
from py4j.java_gateway import JavaObject
+# TODO: Add StreamingCheckpointManager to __all__ once we add streaming
checkpoint manager to the
+# public API
__all__ = ["StreamingQuery", "StreamingQueryManager"]
@@ -721,6 +723,60 @@ class StreamingQueryManager:
self._jsqm.removeListener(listener._jlistener)
+class StreamingCheckpointManager:
+ """
+ A class to manage operations on streaming query checkpoints.
+
+ .. versionadded:: 4.2.0
+
+ Notes
+ -----
+ This API is evolving and currently supported in Spark Classic.
+ """
+
+ def __init__(self, jmanager: "JavaObject") -> None:
+ self._jmanager = jmanager
+
+ def repartition(
+ self, checkpoint_location: str, num_partitions: int,
enforce_exactly_once_sink: bool = True
+ ) -> None:
+ """
+ Repartition the stateful streaming operators state in the streaming
checkpoint to have
+ `num_partitions` partitions. The streaming query MUST not be running.
If `num_partitions` is
+ the same as the current number of partitions, this is a no-op, and an
exception will be
+ thrown.
+
+ This produces a new microbatch in the checkpoint that contains the
repartitioned state i.e.
+ if the last streaming batch was batch `N`, this will create batch
`N+1` with the
+ repartitioned state. Note that this new batch doesn't read input data
from sources, it only
+ represents the repartition operation. The next time the streaming
query is started, it will
+ pick up from this new batch.
+
+ This will return only when the repartitioning is complete or fails.
+
+ .. versionadded:: 4.2.0
+
+ Parameters
+ ----------
+ checkpoint_location : str
+ The checkpoint location of the streaming query, should be the
`checkpointLocation` option
+ on the DataStreamWriter.
+ num_partitions : int
+ The target number of state partitions.
+ enforce_exactly_once_sink : bool, optional
+ If we shouldn't allow skipping failed batches, to avoid duplicates
in exactly once sinks.
+ default ``True``.
+
+ Notes
+ -----
+ This API is experimental.
+
+ This operation should only be performed after the streaming query has
been stopped. If not,
+ can lead to undefined behavior or checkpoint corruption.
+ """
+ self._jmanager.repartition(checkpoint_location, num_partitions,
enforce_exactly_once_sink)
+
+
def _test() -> None:
import doctest
import os
diff --git
a/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
b/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
new file mode 100644
index 000000000000..89f9024bd538
--- /dev/null
+++
b/python/pyspark/sql/tests/streaming/test_streaming_offline_state_repartition.py
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import tempfile
+import unittest
+
+from pyspark import SparkConf
+from pyspark.sql.streaming.state import GroupStateTimeout
+from pyspark.sql.types import LongType, StringType, StructType, StructField
+from pyspark.testing.sqlutils import (
+ ReusedSQLTestCase,
+ have_pandas,
+ have_pyarrow,
+ pandas_requirement_message,
+ pyarrow_requirement_message,
+)
+
+if have_pandas:
+ import pandas as pd
+
+if have_pyarrow:
+ import pyarrow as pa # noqa: F401
+
+
+class OfflineStateRepartitionTestUtils:
+ """Utility class for repartition tests."""
+
+ @staticmethod
+ def run_repartition_test(
+ spark,
+ num_shuffle_partitions,
+ create_streaming_df,
+ output_mode,
+ batch1_data,
+ batch2_data,
+ batch3_data,
+ verify_initial,
+ verify_after_increase,
+ verify_after_decrease,
+ ):
+ """
+ Common helper to run repartition tests with different streaming
operators.
+
+ Steps:
+ 1. Run streaming query to generate initial state
+ 2. Repartition to more partitions
+ 3. Add new data and restart query, verify state preserved
+ 4. Repartition to fewer partitions
+ 5. Add new data and restart query, verify state preserved
+
+ Parameters
+ ----------
+ spark : SparkSession
+ The active Spark session.
+ num_shuffle_partitions : int
+ The initial number of shuffle partitions.
+ create_streaming_df : callable
+ Function(df) -> DataFrame that applies the streaming
transformation.
+ output_mode : str
+ Output mode for the streaming query ("update" or "append").
+ batch1_data, batch2_data, batch3_data : str
+ Data to write for each batch (newline-separated values).
+ verify_initial, verify_after_increase, verify_after_decrease : callable
+ Functions(collected_results) to verify results at each stage.
+ """
+ with tempfile.TemporaryDirectory() as input_dir,
tempfile.TemporaryDirectory() as checkpoint_dir:
+ collected_results = []
+
+ def collect_batch(batch_df, batch_id):
+ rows = batch_df.collect()
+ collected_results.extend(rows)
+
+ def run_streaming_query():
+ df = spark.readStream.format("text").load(input_dir)
+ transformed_df = create_streaming_df(df)
+ query = (
+ transformed_df.writeStream.foreachBatch(collect_batch)
+ .option("checkpointLocation", checkpoint_dir)
+ .outputMode(output_mode)
+ .start()
+ )
+ query.processAllAvailable()
+ query.stop()
+
+ # Step 1: Write initial data and run streaming query
+ with open(os.path.join(input_dir, "batch1.txt"), "w") as f:
+ f.write(batch1_data)
+
+ run_streaming_query()
+ verify_initial(collected_results)
+
+ # Step 2: Repartition to more partitions
+ spark._streamingCheckpointManager.repartition(
+ checkpoint_dir, num_shuffle_partitions * 2
+ )
+
+ # Step 3: Add more data and restart query
+ with open(os.path.join(input_dir, "batch2.txt"), "w") as f:
+ f.write(batch2_data)
+
+ collected_results.clear()
+ run_streaming_query()
+ verify_after_increase(collected_results)
+
+ # Step 4: Repartition to fewer partitions
+ spark._streamingCheckpointManager.repartition(
+ checkpoint_dir, num_shuffle_partitions - 1
+ )
+
+ # Step 5: Add more data and restart query
+ with open(os.path.join(input_dir, "batch3.txt"), "w") as f:
+ f.write(batch3_data)
+
+ collected_results.clear()
+ run_streaming_query()
+ verify_after_decrease(collected_results)
+
+
+class StreamingOfflineStateRepartitionTests(ReusedSQLTestCase):
+ """
+ Test suite for Offline state repartitioning.
+ """
+
+ NUM_SHUFFLE_PARTITIONS = 3
+
+ @classmethod
+ def conf(cls):
+ cfg = SparkConf()
+ cfg.set("spark.sql.shuffle.partitions",
str(cls.NUM_SHUFFLE_PARTITIONS))
+ cfg.set(
+ "spark.sql.streaming.stateStore.providerClass",
+
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+ )
+ return cfg
+
+ def test_fail_if_empty_checkpoint_directory(self):
+ """Test that repartition fails if checkpoint directory is empty."""
+ with tempfile.TemporaryDirectory() as checkpoint_dir:
+ with self.assertRaisesRegex(
+ Exception,
"STATE_REPARTITION_INVALID_CHECKPOINT.NO_COMMITTED_BATCH"
+ ):
+
self.spark._streamingCheckpointManager.repartition(checkpoint_dir, 5)
+
+ def test_fail_if_no_batch_found_in_checkpoint_directory(self):
+ """Test that repartition fails if no batch found in checkpoint
directory."""
+ with tempfile.TemporaryDirectory() as checkpoint_dir:
+ # Write commit log but no offset log
+ commits_dir = os.path.join(checkpoint_dir, "commits")
+ os.makedirs(commits_dir)
+ # Create a minimal commit file for batch 0
+ with open(os.path.join(commits_dir, "0"), "w") as f:
+ f.write("v1\n{}")
+
+ with self.assertRaisesRegex(
+ Exception,
"STATE_REPARTITION_INVALID_CHECKPOINT.NO_BATCH_FOUND"
+ ):
+
self.spark._streamingCheckpointManager.repartition(checkpoint_dir, 5)
+
+ def test_fail_if_repartition_parameter_is_invalid(self):
+ """Test that repartition fails with invalid parameters."""
+ # Test null checkpoint location
+ with self.assertRaisesRegex(Exception,
"STATE_REPARTITION_INVALID_PARAMETER.IS_NULL"):
+ self.spark._streamingCheckpointManager.repartition(None, 5)
+
+ # Test empty checkpoint location
+ with self.assertRaisesRegex(Exception,
"STATE_REPARTITION_INVALID_PARAMETER.IS_EMPTY"):
+ self.spark._streamingCheckpointManager.repartition("", 5)
+
+ # Test numPartitions <= 0
+ with self.assertRaisesRegex(
+ Exception,
"STATE_REPARTITION_INVALID_PARAMETER.IS_NOT_GREATER_THAN_ZERO"
+ ):
+ self.spark._streamingCheckpointManager.repartition("test", 0)
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ pandas_requirement_message or pyarrow_requirement_message,
+ )
+ def test_repartition_with_apply_in_pandas_with_state(self):
+ """Test repartition for a streaming query using
applyInPandasWithState."""
+
+ # Define the stateful function that tracks count per key
+ def stateful_count_func(key, pdf_iter, state):
+ existing_count = state.getOption
+ count = 0 if existing_count is None else existing_count[0]
+
+ new_count = 0
+ for pdf in pdf_iter:
+ new_count += len(pdf)
+
+ total_count = count + new_count
+ state.update((total_count,))
+ yield pd.DataFrame({"key": [key[0]], "count": [total_count]})
+
+ output_schema = StructType(
+ [StructField("key", StringType()), StructField("count",
LongType())]
+ )
+ state_schema = StructType([StructField("count", LongType())])
+
+ def create_streaming_df(df):
+ return df.groupBy(df["value"]).applyInPandasWithState(
+ stateful_count_func,
+ output_schema,
+ state_schema,
+ "Update",
+ GroupStateTimeout.NoTimeout,
+ )
+
+ def verify_initial(results):
+ counts = {row.key: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 2)
+ self.assertEqual(counts.get("b"), 1)
+ self.assertEqual(counts.get("c"), 1)
+
+ def verify_after_increase(results):
+ counts = {row.key: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 3, "State for 'a': 2 + 1 = 3")
+ self.assertEqual(counts.get("b"), 2, "State for 'b': 1 + 1 = 2")
+ self.assertEqual(counts.get("d"), 1, "New key 'd' should have
count 1")
+
+ def verify_after_decrease(results):
+ counts = {row.key: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 4, "State for 'a': 3 + 1 = 4")
+ self.assertEqual(counts.get("c"), 2, "State for 'c': 1 + 1 = 2")
+ self.assertEqual(counts.get("e"), 1, "New key 'e' should have
count 1")
+
+ OfflineStateRepartitionTestUtils.run_repartition_test(
+ spark=self.spark,
+ num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+ create_streaming_df=create_streaming_df,
+ output_mode="update",
+ batch1_data="a\nb\na\nc\n", # a:2, b:1, c:1
+ batch2_data="a\nb\nd\n", # a:+1, b:+1, d:1 (new)
+ batch3_data="a\nc\ne\n", # a:+1, c:+1, e:1 (new)
+ verify_initial=verify_initial,
+ verify_after_increase=verify_after_increase,
+ verify_after_decrease=verify_after_decrease,
+ )
+
+ def test_repartition_with_streaming_aggregation(self):
+ """Test repartition for a streaming aggregation query (groupBy +
count)."""
+
+ def create_streaming_df(df):
+ return df.groupBy("value").count()
+
+ def verify_initial(results):
+ counts = {row.value: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 2)
+ self.assertEqual(counts.get("b"), 1)
+ self.assertEqual(counts.get("c"), 1)
+
+ def verify_after_increase(results):
+ counts = {row.value: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 3, "State for 'a': 2 + 1 = 3")
+ self.assertEqual(counts.get("b"), 2, "State for 'b': 1 + 1 = 2")
+ self.assertEqual(counts.get("d"), 1, "New key 'd' should have
count 1")
+
+ def verify_after_decrease(results):
+ counts = {row.value: row["count"] for row in results}
+ self.assertEqual(counts.get("a"), 4, "State for 'a': 3 + 1 = 4")
+ self.assertEqual(counts.get("c"), 2, "State for 'c': 1 + 1 = 2")
+ self.assertEqual(counts.get("e"), 1, "New key 'e' should have
count 1")
+
+ OfflineStateRepartitionTestUtils.run_repartition_test(
+ spark=self.spark,
+ num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+ create_streaming_df=create_streaming_df,
+ output_mode="update",
+ batch1_data="a\nb\na\nc\n", # a:2, b:1, c:1
+ batch2_data="a\nb\nd\n", # a:+1, b:+1, d:1 (new)
+ batch3_data="a\nc\ne\n", # a:+1, c:+1, e:1 (new)
+ verify_initial=verify_initial,
+ verify_after_increase=verify_after_increase,
+ verify_after_decrease=verify_after_decrease,
+ )
+
+ def test_repartition_with_streaming_dedup(self):
+ """Test repartition for a streaming deduplication query
(dropDuplicates)."""
+
+ def create_streaming_df(df):
+ return df.dropDuplicates(["value"])
+
+ def verify_initial(results):
+ values = {row.value for row in results}
+ self.assertEqual(values, {"a", "b", "c"})
+
+ def verify_after_increase(results):
+ values = {row.value for row in results}
+ self.assertEqual(values, {"d", "e"}, "Only new keys after
repartition")
+
+ def verify_after_decrease(results):
+ values = {row.value for row in results}
+ self.assertEqual(values, {"f", "g"}, "Only new keys after
repartition")
+
+ OfflineStateRepartitionTestUtils.run_repartition_test(
+ spark=self.spark,
+ num_shuffle_partitions=self.NUM_SHUFFLE_PARTITIONS,
+ create_streaming_df=create_streaming_df,
+ output_mode="append",
+ batch1_data="a\nb\na\nc\n", # unique: a, b, c
+ batch2_data="a\nb\nd\ne\n", # a, b duplicates; d, e new
+ batch3_data="a\nc\nf\ng\n", # a, c duplicates; f, g new
+ verify_initial=verify_initial,
+ verify_after_increase=verify_after_increase,
+ verify_after_decrease=verify_after_decrease,
+ )
+
+
+if __name__ == "__main__":
+ from pyspark.testing import main
+
+ main()
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
index 7bb6fda4818c..b9d9515303c8 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingCheckpointManager.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.streaming
+import org.apache.spark.annotation.Evolving
+
/**
* A class to manage operations on streaming query checkpoints.
*/
+@Evolving
private[spark] abstract class StreamingCheckpointManager {
/**
@@ -47,6 +50,7 @@ private[spark] abstract class StreamingCheckpointManager {
* @param enforceExactlyOnceSink
* if we shouldn't allow skipping failed batches, to avoid duplicates in
exactly once sinks.
*/
+ @Evolving
private[spark] def repartition(
checkpointLocation: String,
numPartitions: Int,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
index c4620f13afc6..d1c7c406544c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
@@ -246,7 +246,9 @@ class SparkSession private(
@Unstable
def streams: StreamingQueryManager = sessionState.streamingQueryManager
- private[spark] def streamingCheckpointManager =
sessionState.streamingCheckpointManager
+ @Unstable
+ private[spark] def streamingCheckpointManager: StreamingCheckpointManager =
+ sessionState.streamingCheckpointManager
/**
* Returns an `ArtifactManager` that supports adding, managing and using
session-scoped artifacts
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
index b7c4ff362b22..5b6455b8741f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingCheckpointManager.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.classic
+import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import
org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionErrors,
OfflineStateRepartitionRunner}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming
/** @inheritdoc */
+@Evolving
private[spark] class StreamingCheckpointManager(
sparkSession: SparkSession,
sqlConf: SQLConf) extends streaming.StreamingCheckpointManager with
Logging {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]