This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bfafdde [SPARK-35566][SS] Fix StateStoreRestoreExec output rows
bfafdde is described below
commit bfafddea837615bc025f182f5558f6c396de9dce
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon May 31 16:45:56 2021 +0900
[SPARK-35566][SS] Fix StateStoreRestoreExec output rows
### What changes were proposed in this pull request?
This is a minor change to update how `StateStoreRestoreExec` computes its
number of output rows. Previously we only count input rows, but the optionally
restored rows are not counted in.
### Why are the changes needed?
Currently the number of output rows of `StateStoreRestoreExec` only counts
the each input row. But it actually outputs input rows + optional restored
rows. We should provide correct number of output rows.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #32703 from viirya/fix-outputrows.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 73ba4492b1deea953e4f22dbf36dfcacd81c0f8a)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/execution/streaming/statefulOperators.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 1449d93..e8e18e6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -264,8 +264,9 @@ case class StateStoreRestoreExec(
iter.flatMap { row =>
val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
val restoredRow = stateManager.get(store, key)
- numOutputRows += 1
- Option(restoredRow).toSeq :+ row
+ val outputRows = Option(restoredRow).toSeq :+ row
+ numOutputRows += outputRows.size
+ outputRows
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]