This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f9c5eb808ca1 [SPARK-51237][SS] Add API details for new
transformWithState helper APIs as needed
f9c5eb808ca1 is described below
commit f9c5eb808ca1419f9eea3fc07642ce0d3d15dc54
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Feb 17 21:56:16 2025 +0900
[SPARK-51237][SS] Add API details for new transformWithState helper APIs as
needed
### What changes were proposed in this pull request?
Add API details for new transformWithState helper APIs as needed
### Why are the changes needed?
Improve API docs for user reference
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Comments only change. Existing unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49978 from anishshri-db/task/SPARK-51237.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 17b943106b713d39767dc63110c9e2e878e6dd1c)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/streaming/ExpiredTimerInfo.scala | 4 +-
.../org/apache/spark/sql/streaming/ListState.scala | 46 ++++++++++++--
.../org/apache/spark/sql/streaming/MapState.scala | 73 +++++++++++++++++++---
.../org/apache/spark/sql/streaming/QueryInfo.scala | 24 +++++--
.../spark/sql/streaming/StatefulProcessor.scala | 16 +++++
.../sql/streaming/StatefulProcessorHandle.scala | 16 ++++-
.../apache/spark/sql/streaming/TimerValues.scala | 9 ++-
.../apache/spark/sql/streaming/ValueState.scala | 25 ++++++--
8 files changed, 184 insertions(+), 29 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
index 31075f00e56f..3772e274c441 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
@@ -28,7 +28,9 @@ import org.apache.spark.annotation.Evolving
trait ExpiredTimerInfo extends Serializable {
/**
- * Get the expired timer's expiry time as milliseconds in epoch time.
+ * Function to return the expired timer's expiry time as milliseconds in
epoch time.
+ *
+ * @return - the expired timer's expiry time in milliseconds
*/
def getExpiryTimeInMs(): Long
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
index 79b0d10072e8..16f3625d2a51 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
@@ -24,21 +24,55 @@ import org.apache.spark.annotation.Evolving
*/
trait ListState[S] extends Serializable {
- /** Whether state exists or not. */
+ /**
+ * Function to check whether state exists for current grouping key or not.
+ *
+ * @return - true if state exists, false otherwise.
+ */
def exists(): Boolean
- /** Get the state value. An empty iterator is returned if no value exists. */
+ /**
+ * Function to get the list of elements in the state as an iterator. If the
state does not exist,
+ * an empty iterator is returned.
+ *
+ * Note that it's always recommended to check whether the state exists or
not by calling exists()
+ * before calling get().
+ *
+ * @return - an iterator of elements in the state if it exists, an empty
iterator otherwise.
+ */
def get(): Iterator[S]
- /** Update the value of the list. */
+ /**
+ * Function to update the value of the state with a new list.
+ *
+ * Note that this will replace the existing value with the new value.
+ *
+ * @param newState - new list of elements
+ */
def put(newState: Array[S]): Unit
- /** Append an entry to the list */
+ /**
+ * Function to append a single entry to the existing state list.
+ *
+ * Note that if this is the first time the state is being appended to, the
state will be
+ * initialized to an empty list before appending the new entry.
+ *
+ * @param newState - single list element to be appended
+ */
def appendValue(newState: S): Unit
- /** Append an entire list to the existing value */
+ /**
+ * Function to append a list of entries to the existing state list.
+ *
+ * Note that if this is the first time the state is being appended to, the
state will be
+ * initialized to an empty list before appending the new entries.
+ *
+ * @param newState - list of elements to be appended
+ */
def appendList(newState: Array[S]): Unit
- /** Removes this state for the given grouping key. */
+ /**
+ * Function to remove the state for the current grouping key.
+ */
def clear(): Unit
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
index c514b4e375f8..8459bbcc257d 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
@@ -24,30 +24,85 @@ import org.apache.spark.annotation.Evolving
*/
trait MapState[K, V] extends Serializable {
- /** Whether state exists or not. */
+ /**
+ * Function to check whether any user map entry exists for current grouping
key or not.
+ *
+ * @return - true if state exists, false otherwise.
+ */
def exists(): Boolean
- /** Get the state value if it exists */
+ /**
+ * Function to get the state value for current grouping key and user map key.
+ * If the state exists, the value is returned. If the state does not exist,
+ * the default value for the type is returned for AnyVal types and null for
AnyRef types.
+ *
+ * Note that it's always recommended to check whether the state exists or
not by calling exists()
+ * before calling get().
+ *
+ * @return - the value of the state if it exists. If the state does not
exist, the default value
+ * for the type is returned for AnyVal types and null for AnyRef
types.
+ */
def getValue(key: K): V
- /** Check if the user key is contained in the map */
+ /**
+ * Function to check if the user map key is contained in the map for the
current grouping key.
+ *
+ * @param key - user map key
+ *
+ * @return - true if the user key is present in the map, false otherwise.
+ */
def containsKey(key: K): Boolean
- /** Update value for given user key */
+ /**
+ * Function to add or update the map entry for the current grouping key.
+ *
+ * Note that this function will add the user map key and value if the user
map key is not
+ * present in the map associated with the current grouping key.
+ * If the user map key is already present in the associated map, the value
for the user key
+ * will be updated to the new user map value.
+ *
+ * @param key - user map key
+ * @param value - user map value
+ */
def updateValue(key: K, value: V): Unit
- /** Get the map associated with grouping key */
+ /**
+ * Function to return the iterator of user map key-value pairs present in
the map for the
+ * current grouping key.
+ *
+ * @return - iterator of user map key-value pairs if the map is not empty
+ * and empty iterator otherwise.
+ */
def iterator(): Iterator[(K, V)]
- /** Get the list of keys present in map associated with grouping key */
+ /**
+ * Function to return the user map keys present in the map for the current
grouping key.
+ *
+ * @return - iterator of user map keys if the map is not empty, empty
iterator otherwise.
+ */
def keys(): Iterator[K]
- /** Get the list of values present in map associated with grouping key */
+ /**
+ * Function to return the user map values present in the map for the current
grouping key.
+ *
+ * @return - iterator of user map values if the map is not empty, empty
iterator otherwise.
+ */
def values(): Iterator[V]
- /** Remove user key from map state */
+ /**
+ * Function to remove the user map key from the map for the current grouping
key.
+ *
+ * Note that this function will remove the user map key and its associated
value from the map
+ * associated with the current grouping key. If the user map key is not
present in the map,
+ * this function will not do anything.
+ *
+ * @param key - user map key
+ */
def removeKey(key: K): Unit
- /** Remove this state. */
+ /**
+ * Function to remove the state for the current grouping key. Note that this
removes the entire
+ * map state associated with the current grouping key.
+ */
def clear(): Unit
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
index 2b56c92f8549..c1d47f4c28bd 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
@@ -28,15 +28,31 @@ import org.apache.spark.annotation.Evolving
@Evolving
trait QueryInfo extends Serializable {
- /** Returns the streaming query id associated with stateful operator */
+ /**
+ * Function to return unique streaming query id associated with stateful
operator.
+ *
+ * @return - the unique query id.
+ */
def getQueryId: UUID
- /** Returns the streaming query runId associated with stateful operator */
+ /**
+ * Function to return unique streaming query run id associated with stateful
operator.
+ *
+ * @return - the unique query run id.
+ */
def getRunId: UUID
- /** Returns the batch id associated with stateful operator */
+ /**
+ * Function to return unique batch id associated with stateful operator.
+ *
+ * @return - the unique batch id.
+ */
def getBatchId: Long
- /** Returns the string representation of QueryInfo object */
+ /**
+ * Function to return string representation of the query info.
+ *
+ * @return - query info as string.
+ */
def toString: String
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
index f0ea1dcd6871..c2cb32cccab4 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
@@ -57,6 +57,11 @@ abstract class StatefulProcessor[K, I, O] extends
Serializable {
/**
* Function that will allow users to interact with input data rows along
with the grouping key
* and current timer values and optionally provide output rows.
+ *
+ * Note that in microbatch mode, input rows for a given grouping key will be
provided in a
+ * single function invocation. If the grouping key is not seen in the
current microbatch, this
+ * function will not be invoked for that key.
+ *
* @param key
* \- grouping key
* @param inputRows
@@ -64,6 +69,7 @@ abstract class StatefulProcessor[K, I, O] extends
Serializable {
* @param timerValues
* \- instance of TimerValues that provides access to current
processing/event time if
* available
+ *
* @return
* \- Zero or more output rows
*/
@@ -72,12 +78,18 @@ abstract class StatefulProcessor[K, I, O] extends
Serializable {
/**
* Function that will be invoked when a timer is fired for a given key.
Users can choose to
* evict state, register new timers and optionally provide output rows.
+ *
+ * Note that in microbatch mode, this function will be called once for each
unique timer expiry
+ * for a given key. If no timer expires for a given key, this function will
not be invoked for
+ * that key.
+ *
* @param key
* \- grouping key
* @param timerValues
* \- instance of TimerValues that provides access to current
processing/event
* @param expiredTimerInfo
* \- instance of ExpiredTimerInfo that provides access to expired timer
+ *
* @return
* Zero or more output rows
*/
@@ -131,6 +143,10 @@ abstract class StatefulProcessorWithInitialState[K, I, O,
S] extends StatefulPro
* the input rows, e.g. dataframe from data source reader of existing
streaming query
* checkpoint.
*
+ * Note that in microbatch mode, this function can be called for one or more
times per grouping
+ * key. If the grouping key is not seen within the initial state dataframe
rows, then the
+ * function will not be invoked for that key.
+ *
* @param key
* \- grouping key
* @param initialState
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
index 5a6d9f6c76ea..eae04db06b26 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
@@ -46,6 +46,7 @@ trait StatefulProcessorHandle extends Serializable {
* \- the ttl configuration (time to live duration etc.)
* @tparam T
* \- type of state variable
+ *
* @return
* \- instance of ValueState of type T that can be used to store state
persistently
*/
@@ -71,6 +72,7 @@ trait StatefulProcessorHandle extends Serializable {
* \- the ttl configuration (time to live duration etc.)
* @tparam T
* \- type of state variable
+ *
* @return
* \- instance of ValueState of type T that can be used to store state
persistently
*/
@@ -95,6 +97,7 @@ trait StatefulProcessorHandle extends Serializable {
* \- the ttl configuration (time to live duration etc.)
* @tparam T
* \- type of state variable
+ *
* @return
* \- instance of ListState of type T that can be used to store state
persistently
*/
@@ -120,6 +123,7 @@ trait StatefulProcessorHandle extends Serializable {
* \- the ttl configuration (time to live duration etc.)
* @tparam T
* \- type of state variable
+ *
* @return
* \- instance of ListState of type T that can be used to store state
persistently
*/
@@ -148,6 +152,7 @@ trait StatefulProcessorHandle extends Serializable {
* \- type of key for map state variable
* @tparam V
* \- type of value for map state variable
+ *
* @return
* \- instance of MapState of type [K,V] that can be used to store state
persistently
*/
@@ -176,17 +181,23 @@ trait StatefulProcessorHandle extends Serializable {
* \- type of key for map state variable
* @tparam V
* \- type of value for map state variable
+ *
* @return
* \- instance of MapState of type [K,V] that can be used to store state
persistently
*/
def getMapState[K: Encoder, V: Encoder](stateName: String, ttlConfig:
TTLConfig): MapState[K, V]
- /** Function to return queryInfo for currently running task */
+ /**
+ * Function to return query info for the current query
+ *
+ * @return - QueryInfo object with access to streaming query metadata
+ */
def getQueryInfo(): QueryInfo
/**
* Function to register a processing/event time based timer for given
implicit grouping key and
* provided timestamp
+ *
* @param expiryTimestampMs
* \- timer expiry timestamp in milliseconds
*/
@@ -195,6 +206,7 @@ trait StatefulProcessorHandle extends Serializable {
/**
* Function to delete a processing/event time based timer for given implicit
grouping key and
* provided timestamp
+ *
* @param expiryTimestampMs
* \- timer expiry timestamp in milliseconds
*/
@@ -205,6 +217,7 @@ trait StatefulProcessorHandle extends Serializable {
* listTimers() within the `handleInputRows` method of the StatefulProcessor
will return all the
* unprocessed registered timers, including the one being fired within the
invocation of
* `handleInputRows`.
+ *
* @return
* \- list of all the registered timers for given implicit grouping key
*/
@@ -212,6 +225,7 @@ trait StatefulProcessorHandle extends Serializable {
/**
* Function to delete and purge state variable if defined previously
+ *
* @param stateName
* \- name of the state variable
*/
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
index a3480065965e..f277a20dac2a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
@@ -29,21 +29,26 @@ import org.apache.spark.annotation.Evolving
trait TimerValues extends Serializable {
/**
- * Get the current processing time as milliseconds in epoch time.
+ * Function to get the current processing time as milliseconds in epoch time.
+ *
* @note
* This will return a constant value throughout the duration of a
streaming query trigger,
* even if the trigger is re-executed.
+ *
+ * @return - the current processing time in milliseconds
*/
def getCurrentProcessingTimeInMs(): Long
/**
- * Get the current event time watermark as milliseconds in epoch time.
+ * Function to get the current event time watermark as milliseconds in epoch
time.
*
* @note
* This can be called only when watermark is set before calling
`transformWithState`.
* @note
* The watermark gets propagated at the end of each query. As a result,
this method will
* return 0 (1970-01-01T00:00:00) for the first micro-batch.
+ *
+ * @return - the current event time watermark in milliseconds
*/
def getCurrentWatermarkInMs(): Long
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
index 2910da573157..94b5f71173eb 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
@@ -27,22 +27,35 @@ import org.apache.spark.annotation.Evolving
*/
trait ValueState[S] extends Serializable {
- /** Whether state exists or not. */
+ /**
+ * Function to check whether state exists for current grouping key or not.
+ *
+ * @return - true if state exists, false otherwise.
+ */
def exists(): Boolean
/**
- * Get the state value if it exists or return null otherwise.
+ * Function to get the state value for the current grouping key.
+ * If the state exists, the value is returned. If the state does not exist,
+ * the default value for the type is returned for AnyVal types and null for
AnyRef types.
+ *
+ * Note that it's always recommended to check whether the state exists or
not by calling exists()
+ * before calling get().
+ *
+ * @return - the value of the state if it exists. If the state does not
exist, the default value
+ * for the type is returned for AnyVal types and null for AnyRef
types.
*/
def get(): S
/**
- * Update the value of the state.
+ * Function to update the value of the state for the current grouping key to
the new value.
*
- * @param newState
- * the new value
+ * @param newState - the new value
*/
def update(newState: S): Unit
- /** Remove this state. */
+ /**
+ * Function to remove the state for the current grouping key.
+ */
def clear(): Unit
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]