[
https://issues.apache.org/jira/browse/SPARK-45759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ali Ince updated SPARK-45759:
-----------------------------
Description:
We have a DataWriter component, which processes records in configurable
batches, which are accumulated in {{write(T record)}} implementation and sent
to the persistent store when the configured batch size is reached. Within this
approach, last batch is handled during {{commit()}} call, as there is no other
mechanism of knowing if there are more records or not.
We are now adding support for custom metrics, by implementing the
{{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}}
and {{DataWriter}} implementations. The problem we see is, since
{{CustomMetrics.updateMetrics}} is only called
[during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443]
and [just
after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451]
record processing, we do not observe the complete metrics since the last batch
that is handled during {{commit()}} call is not collected/updated.
We propose to also to add {{CustomMetrics.updateMetrics}} call after
{{commit()}} is processed successfully, ideally just before {{run}} function
exits (maybe just above
[https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).
was:
We have a DataWriter component, which processes records in configurable
batches, which are accumulated in {{write(T record)}} implementation and sent
to the persistent store when the configured batch size is reached. Within this
approach, last batch is handled during {{commit()}} call, as there is no other
mechanism of knowing if there are more records or not.
We are now adding support for custom metrics, by implementing the
{{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}}
and {{DataWriter}} implementations. The problem we see is, since
{{CustomMetrics.updateMetrics}} is only called [during|#L443-L443] and [just
after|#L451-L451] record processing, we do not observe the complete metrics
since the last batch that is handled during {{commit()}} call is not
collected/updated.
We propose to also to add {{CustomMetrics.updateMetrics}} call after
{{commit()}} is processed successfully, ideally just before {{run}} function
exits (maybe just above
[https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).
> Custom metrics should be updated after commit too
> -------------------------------------------------
>
> Key: SPARK-45759
> URL: https://issues.apache.org/jira/browse/SPARK-45759
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1
> Reporter: Ali Ince
> Priority: Minor
>
> We have a DataWriter component, which processes records in configurable
> batches, which are accumulated in {{write(T record)}} implementation and sent
> to the persistent store when the configured batch size is reached. Within
> this approach, last batch is handled during {{commit()}} call, as there is no
> other mechanism of knowing if there are more records or not.
> We are now adding support for custom metrics, by implementing the
> {{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}}
> and {{DataWriter}} implementations. The problem we see is, since
> {{CustomMetrics.updateMetrics}} is only called
> [during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443]
> and [just
> after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451]
> record processing, we do not observe the complete metrics since the last
> batch that is handled during {{commit()}} call is not collected/updated.
> We propose to also to add {{CustomMetrics.updateMetrics}} call after
> {{commit()}} is processed successfully, ideally just before {{run}} function
> exits (maybe just above
> [https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]