Copilot commented on code in PR #1946:
URL: https://github.com/apache/auron/pull/1946#discussion_r2718055081
##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala:
##########
@@ -1405,9 +1406,20 @@ object NativeConverters extends Logging {
serialized: Array[Byte]): (E with Serializable, S) = {
Utils.tryWithResource(new ByteArrayInputStream(serialized)) { bis =>
Utils.tryWithResource(new ObjectInputStream(bis)) { ois =>
- val expr = ois.readObject().asInstanceOf[E with Serializable]
- val payload = ois.readObject().asInstanceOf[S with Serializable]
- (expr, payload)
+ def read(): (E with Serializable, S) = {
+ val expr = ois.readObject().asInstanceOf[E with Serializable]
+ val payload = ois.readObject().asInstanceOf[S with Serializable]
+ (expr, payload)
+ }
+ // Spark TaskMetrics#externalAccums is not thread-safe
+ val taskContext = TaskContext.get()
+ if (taskContext != null) {
+ taskContext.taskMetrics().synchronized {
Review Comment:
The synchronization on the entire taskMetrics() object may be overly broad
and could impact performance. The comment on line 1414 mentions that
"TaskMetrics#externalAccums is not thread-safe", but the synchronization is
performed on the entire taskMetrics() object rather than on externalAccums
specifically. This means the lock is held during the entire deserialization
process (readObject calls), which could be time-consuming and might create
unnecessary contention. Consider whether synchronization on a more specific
object (like externalAccums itself) or using a finer-grained locking strategy
would be more appropriate.
```suggestion
val metrics = taskContext.taskMetrics()
// Synchronize on the more specific externalAccums object when
available,
// falling back to the whole metrics instance if reflection fails
or is null.
val lockObj: AnyRef = try {
val fieldValue = FieldUtils.readField(metrics, "externalAccums",
true)
fieldValue match {
case ref: AnyRef if ref != null => ref
case _ => metrics
}
} catch {
case _: Throwable => metrics
}
lockObj.synchronized {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]