This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea773cc [AURON #1937][CI] Enable TPCDS test q14b (#1946)
5ea773cc is described below
commit 5ea773cc6f89204e90f8c415dbaf1f82276bc53d
Author: cxzl25 <[email protected]>
AuthorDate: Tue Jan 27 10:47:57 2026 +0800
[AURON #1937][CI] Enable TPCDS test q14b (#1946)
# Which issue does this PR close?
Closes #1937
# Rationale for this change
# What changes are included in this PR?
# Are there any user-facing changes?
# How was this patch tested?
---------
Co-authored-by: Copilot <[email protected]>
---
.github/workflows/tpcds-reusable.yml | 3 +--
.../org/apache/spark/sql/auron/NativeConverters.scala | 18 +++++++++++++++---
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/tpcds-reusable.yml
b/.github/workflows/tpcds-reusable.yml
index 9ed97a15..7950326e 100644
--- a/.github/workflows/tpcds-reusable.yml
+++ b/.github/workflows/tpcds-reusable.yml
@@ -68,11 +68,10 @@ on:
description: 'Optional list of queries to run'
required: false
type: string
- # Skip q14b temporarily: flaky failure (see
https://github.com/apache/auron/issues/1937), to be re-enabled after fix
default: |
[
"q1,q2,q3,q4,q5,q6,q7,q8,q9",
- "q10,q11,q12,q13,q14a,q15,q16,q17,q18,q19",
+ "q10,q11,q12,q13,q14a,q14b,q15,q16,q17,q18,q19",
"q20,q21,q22,q23a,q23b,q24a,q24b,q25,q26,q27,q28,q29",
"q31,q33,q34,q35,q36,q37,q38,q39a,q39b",
"q40,q41,q42,q43,q44,q45,q46,q47,q48,q49",
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 29b9386a..7c17cba9 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -32,6 +32,7 @@ import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.commons.lang3.reflect.FieldUtils
import org.apache.commons.lang3.reflect.MethodUtils
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.util.Using
import org.apache.spark.sql.catalyst.InternalRow
@@ -1405,9 +1406,20 @@ object NativeConverters extends Logging {
serialized: Array[Byte]): (E with Serializable, S) = {
Utils.tryWithResource(new ByteArrayInputStream(serialized)) { bis =>
Utils.tryWithResource(new ObjectInputStream(bis)) { ois =>
- val expr = ois.readObject().asInstanceOf[E with Serializable]
- val payload = ois.readObject().asInstanceOf[S with Serializable]
- (expr, payload)
+ def read(): (E with Serializable, S) = {
+ val expr = ois.readObject().asInstanceOf[E with Serializable]
+ val payload = ois.readObject().asInstanceOf[S with Serializable]
+ (expr, payload)
+ }
+ // Spark TaskMetrics#externalAccums is not thread-safe
+ val taskContext = TaskContext.get()
+ if (taskContext != null) {
+ taskContext.taskMetrics().synchronized {
+ read()
+ }
+ } else {
+ read()
+ }
}
}
}