This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea773cc [AURON #1937][CI] Enable TPCDS test q14b (#1946)
5ea773cc is described below

commit 5ea773cc6f89204e90f8c415dbaf1f82276bc53d
Author: cxzl25 <[email protected]>
AuthorDate: Tue Jan 27 10:47:57 2026 +0800

    [AURON #1937][CI] Enable TPCDS test q14b (#1946)
    
    # Which issue does this PR close?
    
    Closes #1937
    
    # Rationale for this change
    
    # What changes are included in this PR?
    
    # Are there any user-facing changes?
    
    # How was this patch tested?
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .github/workflows/tpcds-reusable.yml                   |  3 +--
 .../org/apache/spark/sql/auron/NativeConverters.scala  | 18 +++++++++++++++---
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/tpcds-reusable.yml 
b/.github/workflows/tpcds-reusable.yml
index 9ed97a15..7950326e 100644
--- a/.github/workflows/tpcds-reusable.yml
+++ b/.github/workflows/tpcds-reusable.yml
@@ -68,11 +68,10 @@ on:
         description: 'Optional list of queries to run'
         required: false
         type: string
-        # Skip q14b temporarily: flaky failure (see 
https://github.com/apache/auron/issues/1937), to be re-enabled after fix
         default: |
           [
             "q1,q2,q3,q4,q5,q6,q7,q8,q9",
-            "q10,q11,q12,q13,q14a,q15,q16,q17,q18,q19",
+            "q10,q11,q12,q13,q14a,q14b,q15,q16,q17,q18,q19",
             "q20,q21,q22,q23a,q23b,q24a,q24b,q25,q26,q27,q28,q29",
             "q31,q33,q34,q35,q36,q37,q38,q39a,q39b",
             "q40,q41,q42,q43,q44,q45,q46,q47,q48,q49",
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 29b9386a..7c17cba9 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -32,6 +32,7 @@ import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.ipc.ArrowStreamWriter
 import org.apache.commons.lang3.reflect.FieldUtils
 import org.apache.commons.lang3.reflect.MethodUtils
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.auron.util.Using
 import org.apache.spark.sql.catalyst.InternalRow
@@ -1405,9 +1406,20 @@ object NativeConverters extends Logging {
       serialized: Array[Byte]): (E with Serializable, S) = {
     Utils.tryWithResource(new ByteArrayInputStream(serialized)) { bis =>
       Utils.tryWithResource(new ObjectInputStream(bis)) { ois =>
-        val expr = ois.readObject().asInstanceOf[E with Serializable]
-        val payload = ois.readObject().asInstanceOf[S with Serializable]
-        (expr, payload)
+        def read(): (E with Serializable, S) = {
+          val expr = ois.readObject().asInstanceOf[E with Serializable]
+          val payload = ois.readObject().asInstanceOf[S with Serializable]
+          (expr, payload)
+        }
+        // Spark TaskMetrics#externalAccums is not thread-safe
+        val taskContext = TaskContext.get()
+        if (taskContext != null) {
+          taskContext.taskMetrics().synchronized {
+            read()
+          }
+        } else {
+          read()
+        }
       }
     }
   }

Reply via email to