This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bfafdde  [SPARK-35566][SS] Fix StateStoreRestoreExec output rows
bfafdde is described below

commit bfafddea837615bc025f182f5558f6c396de9dce
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Mon May 31 16:45:56 2021 +0900

    [SPARK-35566][SS] Fix StateStoreRestoreExec output rows
    
    ### What changes were proposed in this pull request?
    
    This is a minor change to update how `StateStoreRestoreExec` computes its 
number of output rows. Previously we only count input rows, but the optionally 
restored rows are not counted in.
    
    ### Why are the changes needed?
    
    Currently the number of output rows of `StateStoreRestoreExec` only counts 
the each input row. But it actually outputs input rows + optional restored 
rows. We should provide correct number of output rows.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #32703 from viirya/fix-outputrows.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 73ba4492b1deea953e4f22dbf36dfcacd81c0f8a)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/execution/streaming/statefulOperators.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 1449d93..e8e18e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -264,8 +264,9 @@ case class StateStoreRestoreExec(
           iter.flatMap { row =>
             val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
             val restoredRow = stateManager.get(store, key)
-            numOutputRows += 1
-            Option(restoredRow).toSeq :+ row
+            val outputRows = Option(restoredRow).toSeq :+ row
+            numOutputRows += outputRows.size
+            outputRows
           }
         }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to