This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new bfafdde [SPARK-35566][SS] Fix StateStoreRestoreExec output rows bfafdde is described below commit bfafddea837615bc025f182f5558f6c396de9dce Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Mon May 31 16:45:56 2021 +0900 [SPARK-35566][SS] Fix StateStoreRestoreExec output rows ### What changes were proposed in this pull request? This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in. ### Why are the changes needed? Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #32703 from viirya/fix-outputrows. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 73ba4492b1deea953e4f22dbf36dfcacd81c0f8a) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/execution/streaming/statefulOperators.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1449d93..e8e18e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -264,8 +264,9 @@ case class StateStoreRestoreExec( iter.flatMap { row => val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) val restoredRow = stateManager.get(store, key) - numOutputRows += 1 - Option(restoredRow).toSeq :+ row + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org