Copilot commented on code in PR #1946:
URL: https://github.com/apache/auron/pull/1946#discussion_r2717968256


##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala:
##########
@@ -1405,9 +1406,19 @@ object NativeConverters extends Logging {
       serialized: Array[Byte]): (E with Serializable, S) = {
     Utils.tryWithResource(new ByteArrayInputStream(serialized)) { bis =>
       Utils.tryWithResource(new ObjectInputStream(bis)) { ois =>
-        val expr = ois.readObject().asInstanceOf[E with Serializable]
-        val payload = ois.readObject().asInstanceOf[S with Serializable]
-        (expr, payload)
+        // Spark TaskMetrics#externalAccums is not thread-safe
+        val taskContext = TaskContext.get()
+        if (taskContext != null) {
+          taskContext.taskMetrics().synchronized {
+            val expr = ois.readObject().asInstanceOf[E with Serializable]
+            val payload = ois.readObject().asInstanceOf[S with Serializable]
+            (expr, payload)
+          }
+        } else {
+          val expr = ois.readObject().asInstanceOf[E with Serializable]
+          val payload = ois.readObject().asInstanceOf[S with Serializable]
+          (expr, payload)

Review Comment:
   There is significant code duplication between the synchronized block (lines 
1413-1415) and the else block (lines 1418-1420). The same two lines of code are 
repeated. Consider extracting the deserialization logic to avoid this 
duplication. For example, you could perform the readObject calls before the 
conditional and only wrap the specific operation that requires synchronization.
   ```suggestion
           def read(): (E with Serializable, S) = {
             val expr = ois.readObject().asInstanceOf[E with Serializable]
             val payload = ois.readObject().asInstanceOf[S with Serializable]
             (expr, payload)
           }
           // Spark TaskMetrics#externalAccums is not thread-safe
           val taskContext = TaskContext.get()
           if (taskContext != null) {
             taskContext.taskMetrics().synchronized {
               read()
             }
           } else {
             read()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to