This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 59a7ad46588f [SPARK-50437][SS] Reduce overhead of creating
deserializers in TransformWithStateExec
59a7ad46588f is described below
commit 59a7ad46588fa1d765a8ee388641e3b09b77a7d2
Author: Jerry Peng <[email protected]>
AuthorDate: Thu Nov 28 11:28:10 2024 +0900
[SPARK-50437][SS] Reduce overhead of creating deserializers in
TransformWithStateExec
### What changes were proposed in this pull request?
An optimization that creates deserializers for keys and values at the
beginning of a batch.
### Why are the changes needed?
Currently, deserializers for key and values are created every time
`handleInputRows` gets called. The function
`ObjectOperator.deserializeRowToObject` is called twice do generate the
deserializer for the key and value. The function will do the following things
1. Generate java code for the deserializer based on input expression and
grouping attributes
2. Compile that java code to java class
The problem is that step 1 will occur every time the function is called.
The compiling the code, i.e. step 2, will only happen once since the generated
class will be cached and can be reused.
Step 1 can incur a heavy penalty in situations where handleInputRows will
be called many times. For example, if there are many distinct keys. Note that
handleInputRows is called once per distinct key.
Here is a flamegraph:
![Uploading Screenshot 2024-11-26 at 11.55.12 PM.png…]()
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests should suffice
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48985 from jerrypeng/SPARK-50437.
Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../execution/streaming/TransformWithStateExec.scala | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 9c31ff0a7443..5716242afc15 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -83,6 +83,17 @@ case class TransformWithStateExec(
// dummy value schema, the real schema will get during state variable init
time
private val DUMMY_VALUE_ROW_SCHEMA = new StructType().add("value",
BinaryType)
+ // We need to just initialize key and value deserializer once per partition.
+ // The deserializers need to be lazily created on the executor since they
+ // are not serializable.
+ // Ideas for for improvement can be found here:
+ // https://issues.apache.org/jira/browse/SPARK-50437
+ private lazy val getKeyObj =
+ ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+ private lazy val getValueObj =
+ ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
+
override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
if (timeMode == ProcessingTime) {
// TODO SPARK-50180: check if we can return true only if actual timers
are registered,
@@ -230,11 +241,6 @@ case class TransformWithStateExec(
private def handleInputRows(keyRow: UnsafeRow, valueRowIter:
Iterator[InternalRow]):
Iterator[InternalRow] = {
- val getKeyObj =
- ObjectOperator.deserializeRowToObject(keyDeserializer,
groupingAttributes)
-
- val getValueObj =
- ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
@@ -261,8 +267,6 @@ case class TransformWithStateExec(
private def processInitialStateRows(
keyRow: UnsafeRow,
initStateIter: Iterator[InternalRow]): Unit = {
- val getKeyObj =
- ObjectOperator.deserializeRowToObject(keyDeserializer,
groupingAttributes)
val getInitStateValueObj =
ObjectOperator.deserializeRowToObject(initialStateDeserializer,
initialStateDataAttrs)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]