Copilot commented on code in PR #1946:
URL: https://github.com/apache/auron/pull/1946#discussion_r2718055081


##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala:
##########
@@ -1405,9 +1406,20 @@ object NativeConverters extends Logging {
       serialized: Array[Byte]): (E with Serializable, S) = {
     Utils.tryWithResource(new ByteArrayInputStream(serialized)) { bis =>
       Utils.tryWithResource(new ObjectInputStream(bis)) { ois =>
-        val expr = ois.readObject().asInstanceOf[E with Serializable]
-        val payload = ois.readObject().asInstanceOf[S with Serializable]
-        (expr, payload)
+        def read(): (E with Serializable, S) = {
+          val expr = ois.readObject().asInstanceOf[E with Serializable]
+          val payload = ois.readObject().asInstanceOf[S with Serializable]
+          (expr, payload)
+        }
+        // Spark TaskMetrics#externalAccums is not thread-safe
+        val taskContext = TaskContext.get()
+        if (taskContext != null) {
+          taskContext.taskMetrics().synchronized {

Review Comment:
   The synchronization on the entire taskMetrics() object may be overly broad 
and could impact performance. The comment on line 1414 mentions that 
"TaskMetrics#externalAccums is not thread-safe", but the synchronization is 
performed on the entire taskMetrics() object rather than on externalAccums 
specifically. This means the lock is held during the entire deserialization 
process (readObject calls), which could be time-consuming and might create 
unnecessary contention. Consider whether synchronization on a more specific 
object (like externalAccums itself) or using a finer-grained locking strategy 
would be more appropriate.
   ```suggestion
             val metrics = taskContext.taskMetrics()
             // Synchronize on the more specific externalAccums object when 
available,
             // falling back to the whole metrics instance if reflection fails 
or is null.
             val lockObj: AnyRef = try {
               val fieldValue = FieldUtils.readField(metrics, "externalAccums", 
true)
               fieldValue match {
                 case ref: AnyRef if ref != null => ref
                 case _ => metrics
               }
             } catch {
               case _: Throwable => metrics
             }
             lockObj.synchronized {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to