[ 
https://issues.apache.org/jira/browse/SPARK-45759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ali Ince updated SPARK-45759:
-----------------------------
    Description: 
We have a DataWriter component, which processes records in configurable 
batches, which are accumulated in {{write(T record)}} implementation and sent 
to the persistent store when the configured batch size is reached. Within this 
approach, last batch is handled during {{commit()}} call, as there is no other 
mechanism of knowing if there are more records or not.

We are now adding support for custom metrics, by implementing the 
{{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} 
and {{DataWriter}} implementations. The problem we see is, since 
{{CustomMetrics.updateMetrics}} is only called 
[during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443]
 and [just 
after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451]
 record processing, we do not observe the complete metrics since the last batch 
that is handled during {{commit()}} call is not collected/updated.

We propose to also to add {{CustomMetrics.updateMetrics}} call after 
{{commit()}} is processed successfully, ideally just before {{run}} function 
exits (maybe just above 
[https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).

  was:
We have a DataWriter component, which processes records in configurable 
batches, which are accumulated in {{write(T record)}} implementation and sent 
to the persistent store when the configured batch size is reached. Within this 
approach, last batch is handled during {{commit()}} call, as there is no other 
mechanism of knowing if there are more records or not.

We are now adding support for custom metrics, by implementing the 
{{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} 
and {{DataWriter}} implementations. The problem we see is, since 
{{CustomMetrics.updateMetrics}} is only called [during|#L443-L443] and [just 
after|#L451-L451] record processing, we do not observe the complete metrics 
since the last batch that is handled during {{commit()}} call is not 
collected/updated.

We propose to also to add {{CustomMetrics.updateMetrics}} call after 
{{commit()}} is processed successfully, ideally just before {{run}} function 
exits (maybe just above 
[https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).


> Custom metrics should be updated after commit too
> -------------------------------------------------
>
>                 Key: SPARK-45759
>                 URL: https://issues.apache.org/jira/browse/SPARK-45759
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: Ali Ince
>            Priority: Minor
>
> We have a DataWriter component, which processes records in configurable 
> batches, which are accumulated in {{write(T record)}} implementation and sent 
> to the persistent store when the configured batch size is reached. Within 
> this approach, last batch is handled during {{commit()}} call, as there is no 
> other mechanism of knowing if there are more records or not.
> We are now adding support for custom metrics, by implementing the 
> {{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} 
> and {{DataWriter}} implementations. The problem we see is, since 
> {{CustomMetrics.updateMetrics}} is only called 
> [during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443]
>  and [just 
> after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451]
>  record processing, we do not observe the complete metrics since the last 
> batch that is handled during {{commit()}} call is not collected/updated.
> We propose to also to add {{CustomMetrics.updateMetrics}} call after 
> {{commit()}} is processed successfully, ideally just before {{run}} function 
> exits (maybe just above 
> [https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to