[ 
https://issues.apache.org/jira/browse/SPARK-33359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227029#comment-17227029
 ] 

Jungtaek Lim commented on SPARK-33359:
--------------------------------------

That's by design. The metric is available only for V2 sink. The interface of V1 
sink doesn't open the possibility to count the number of output rows, as it can 
run arbitrary DataFrame operations not just writing, just like we do with 
foreachBatch. -1 means "unknown".

Probably better to document this to avoid confusion, but this by itself is not 
a bug. Let me close this for now. Thanks.

> foreachBatch sink outputs wrong metrics
> ---------------------------------------
>
>                 Key: SPARK-33359
>                 URL: https://issues.apache.org/jira/browse/SPARK-33359
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>         Environment: Spark on Kubernetes cluster with spark-3.0.0 image. The 
> CRD is ScheduledSparkApplication
>            Reporter: John Wesley
>            Priority: Minor
>
> I created 2 similar jobs, 
> 1) First job reading from kafka and writing to console sink in append mode
> 2) Second job reading from kafka and writing to foreachBatch sink (which then 
> writes in parquet format to S3).
> The metrics in the log for console shows correct values for numInputRows and 
> numOutputRows whereas they are wrong for foreachBatch.
> With foreachBatch:
> numInputRows is +1 more than what is actually present
> numOutputRows is always -1.
> ///Console sink
> //====================================20/11/05 13:36:21 INFO 
> MicroBatchExecution: Streaming query made progress: {
>   "id" : "775aa543-58bf-4cf7-b274-390da640b6ae",
>   "runId" : "e5eac4ca-0b29-4ed4-be35-b70bd20906d5",
>   "name" : null,
>   "timestamp" : "2020-11-05T13:36:08.921Z",
>   "batchId" : 0,
>   "numInputRows" : 10,
>   "processedRowsPerSecond" : 0.7759757895553658,
>   "durationMs" : {
>     "addBatch" : 7735,
>     "getBatch" : 152,
>     "latestOffset" : 2037,
>     "queryPlanning" : 1010,
>     "triggerExecution" : 12886,
>     "walCommit" : 938
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[testedr7]]",
>     "startOffset" : null,
>     "endOffset" : {
>       "testedr7" : {
>         "0" : 10
>       }
>     },
>     "numInputRows" : 10,
>     "processedRowsPerSecond" : 0.7759757895553658
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleTable$@38c3a814",
>     "numOutputRows" : 10
>   }
> }
> ///ForEachBatch Sink
> //====================================20/11/05 13:43:38 INFO 
> MicroBatchExecution: Streaming query made progress: {
>   "id" : "789f9a00-2f2a-4f75-b643-fea201088b4a",
>   "runId" : "b5e695c5-3a2e-4ad2-9dbf-11b69f368f61",
>   "name" : null,
>   "timestamp" : "2020-11-05T13:43:15.421Z",
>   "batchId" : 0,
>   "numInputRows" : 11,
>   "processedRowsPerSecond" : 0.4833252779120348,
>   "durationMs" : {
>     "addBatch" : 17689,
>     "getBatch" : 135,
>     "latestOffset" : 2121,
>     "queryPlanning" : 880,
>     "triggerExecution" : 22758,
>     "walCommit" : 876
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[testedr7]]",
>     "startOffset" : null,
>     "endOffset" : {
>       "testedr7" : {
>         "0" : 10
>       }
>     },
>     "numInputRows" : 11,
>     "processedRowsPerSecond" : 0.4833252779120348
>   } ],
>   "sink" : {
>     "description" : "ForeachBatchSink",
>     "numOutputRows" : -1
>   }
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to