Copilot commented on code in PR #1752:
URL: https://github.com/apache/auron/pull/1752#discussion_r2619176778


##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkExpressionTestsBase.scala:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, 
ConvertToLocalRelation, NullPropagation}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
MapData, TypeUtils}
+import org.apache.spark.sql.execution.auron.plan.NativeProjectBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.scalactic.TripleEqualsSupport.Spread
+
+/**
+ * Base trait for all Spark expression tests.
+ */
+trait SparkExpressionTestsBase
+    extends SparkFunSuite
+    with ExpressionEvalHelper
+    with SparkTestsBase {
+  val SUPPORTED_DATA_TYPES = TypeCollection(
+    BooleanType,
+    ByteType,
+    ShortType,
+    IntegerType,
+    LongType,
+    FloatType,
+    DoubleType,
+    DecimalType,
+    StringType,
+    BinaryType,
+    DateType,
+    TimestampType,
+    ArrayType,
+    StructType,
+    MapType)
+
+  override def beforeAll(): Unit = {
+    // Prepare working paths.
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+
+    super.beforeAll()
+    initializeSession()
+    _spark.sparkContext.setLogLevel("WARN")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
+    }
+  }
+
+  protected def initializeSession(): Unit = {
+    if (_spark == null) {
+      val sparkBuilder = SparkSession
+        .builder()
+        .appName("Auron-UT")
+        .master(s"local[2]")
+        // Avoid static evaluation for literal input by spark catalyst.
+        .config(
+          SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
+          ConvertToLocalRelation.ruleName +
+            "," + ConstantFolding.ruleName + "," + NullPropagation.ruleName)
+
+      for ((key, value) <- sparkConfList) {
+        sparkBuilder.config(key, value)
+      }
+
+      _spark = sparkBuilder
+        .getOrCreate()
+    }
+  }
+
+  protected var _spark: SparkSession = null
+
+  override protected def checkEvaluation(
+      expression: => Expression,
+      expected: Any,
+      inputRow: InternalRow = EmptyRow): Unit = {
+
+    if (canConvertToDataFrame(inputRow)) {
+      val resolver = ResolveTimeZone
+      val expr = resolver.resolveTimeZones(expression)
+      assert(expr.resolved)
+
+      auronCheckExpression(expr, expected, inputRow)
+    } else {
+      logWarning(
+        "Skipping evaluation - Nonempty inputRow cannot be converted to 
DataFrame " +
+          "due to complex/unsupported types.\n")
+    }
+  }
+
+  def auronCheckExpression(expression: Expression, expected: Any, inputRow: 
InternalRow): Unit = {
+    val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
+      convertInternalRowToDataFrame(inputRow)
+    } else {
+      val schema = StructType(StructField("a", IntegerType, nullable = true) 
:: Nil)
+      val empData = Seq(Row(1))
+      _spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
+    }
+    val resultDF = df.select(Column(expression))
+    val result = resultDF.collect()
+
+    if (checkDataTypeSupported(expression) &&
+      expression.children.forall(checkDataTypeSupported)) {
+      val projectExec = resultDF.queryExecution.executedPlan.collect {
+        case p: NativeProjectBase => p
+      }
+
+      if (projectExec.size == 1) {
+        logInfo("Offload to native backend in the test.\n")
+      } else {
+        logInfo("Not supported in Auron, fall back to vanilla spark in the 
test.\n")
+        shouldNotFallback()
+      }
+    } else {
+      logInfo("Has unsupported data type, fall back to vanilla spark.\n")
+      shouldNotFallback()
+    }
+
+    if (!(checkResult(result.head.get(0), expected, expression.dataType, 
expression.nullable)
+        || checkResult(
+          
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
+            result.head.get(0)
+          ), // decimal precision is wrong from value
+          CatalystTypeConverters.convertToCatalyst(expected),
+          expression.dataType,
+          expression.nullable))) {
+      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+      fail(
+        s"Incorrect evaluation: $expression, " +
+          s"actual: ${result.head.get(0)}, " +
+          s"expected: $expected$input")
+    }
+  }
+
+  /**
+   * Sort map data by key and return the sorted key array and value array.
+   *
+   * @param input
+   *   input map data.
+   * @param kt
+   *   key type.
+   * @param vt
+   *   value type.
+   * @return
+   *   the sorted key array and value array.
+   */
+  private def getSortedArrays(
+      input: MapData,
+      kt: DataType,
+      vt: DataType): (ArrayData, ArrayData) = {
+    val keyArray = input.keyArray().toArray[Any](kt)
+    val valueArray = input.valueArray().toArray[Any](vt)
+    val newMap = (keyArray.zip(valueArray)).toMap
+    val sortedMap = mutable.SortedMap(newMap.toSeq: 
_*)(TypeUtils.getInterpretedOrdering(kt))
+    (new GenericArrayData(sortedMap.keys.toArray), new 
GenericArrayData(sortedMap.values.toArray))
+  }

Review Comment:
   Missing scaladoc for parameters: The method `getSortedArrays` has parameter 
documentation in the scaladoc, but the parameters `kt` and `vt` descriptions 
are incomplete - they just say "key type" and "value type" without explaining 
what DataType they should be or any constraints.



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkQueryTestsBase.scala:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.execution.SQLExecution
+import org.scalatest.Assertions
+
+/**
+ * Basic trait for all Spark query tests.
+ */
+trait SparkQueryTestsBase extends QueryTest with SparkTestsSharedSessionBase {
+  def prepareWorkDir(): Unit = {
+    // prepare working paths
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+  }
+
+  override def beforeAll(): Unit = {
+    prepareWorkDir()
+    super.beforeAll()
+
+    spark.sparkContext.setLogLevel("WARN")
+  }
+
+  override protected def checkAnswer(df: => DataFrame, expectedAnswer: 
Seq[Row]): Unit = {
+    assertEmptyMissingInput(df)
+    AuronQueryTestUtil.checkAnswer(df, expectedAnswer)
+  }
+}
+
+object AuronQueryTestUtil extends Assertions {
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   *
+   * @param df
+   *   the DataFrame to be executed
+   * @param expectedAnswer
+   *   the expected result in a Seq of Rows.
+   * @param checkToRDD
+   *   whether to verify deserialization to an RDD. This runs the query twice.
+   */
+  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row], checkToRDD: Boolean 
= true): Unit = {
+    getErrorMessageInCheckAnswer(df, expectedAnswer, checkToRDD) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result. If 
there was exception
+   * during the execution or the contents of the DataFrame does not match the 
expected result, an
+   * error message will be returned. Otherwise, a None will be returned.
+   *
+   * @param df
+   *   the DataFrame to be executed
+   * @param expectedAnswer
+   *   the expected result in a Seq of Rows.
+   * @param checkToRDD
+   *   whether to verify deserialization to an RDD. This runs the query twice.
+   */
+  def getErrorMessageInCheckAnswer(
+      df: DataFrame,
+      expectedAnswer: Seq[Row],
+      checkToRDD: Boolean = true): Option[String] = {
+    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s 
}.nonEmpty
+    if (checkToRDD) {
+      SQLExecution.withSQLConfPropagated(df.sparkSession) {
+        df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
+      }
+    }
+
+    val sparkAnswer =
+      try df.collect().toSeq
+      catch {
+        case e: Exception =>
+          val errorMessage =
+            s"""
+               |Exception thrown while executing query:
+               |${df.queryExecution}
+               |== Exception ==
+               |$e
+               |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
+          """.stripMargin
+          return Some(errorMessage)
+      }
+
+    sameRows(expectedAnswer, sparkAnswer, isSorted).map { results =>
+      s"""
+           |Results do not match for query:
+           |Timezone: ${TimeZone.getDefault}
+           |Timezone Env: ${sys.env.getOrElse("TZ", "")}
+           |
+           |${df.queryExecution}
+           |== Results ==
+           |$results
+       """.stripMargin
+    }
+  }
+
+  def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
+    // Converts data to types that we can do equality comparison using Scala 
collections.
+    // For BigDecimal type, the Scala type has a better definition of equality 
test (similar to
+    // Java's java.math.BigDecimal.compareTo).
+    // For binary arrays, we convert it to Seq to avoid of calling 
java.util.Arrays.equals for
+    // equality test.
+    val converted: Seq[Row] = answer.map(prepareRow)
+    if (!isSorted) converted.sortBy(_.toString()) else converted
+  }
+
+  // We need to call prepareRow recursively to handle schemas with struct 
types.
+  def prepareRow(row: Row): Row = {
+    Row.fromSeq(row.toSeq.map {
+      case null => null
+      case bd: java.math.BigDecimal => BigDecimal(bd)
+      // Equality of WrappedArray differs for AnyVal and AnyRef in Scala 
2.12.2+
+      case seq: Seq[_] =>
+        seq.map {
+          case b: java.lang.Byte => b.byteValue
+          case s: java.lang.Short => s.shortValue
+          case i: java.lang.Integer => i.intValue
+          case l: java.lang.Long => l.longValue
+          case f: java.lang.Float => f.floatValue
+          case d: java.lang.Double => d.doubleValue
+          case x => x
+        }
+      // Convert array to Seq for easy equality check.
+      case b: Array[_] => b.toSeq
+      case r: Row => prepareRow(r)
+      case o => o
+    })
+  }
+
+  private def genError(
+      expectedAnswer: Seq[Row],
+      sparkAnswer: Seq[Row],
+      isSorted: Boolean = false): String = {
+    val getRowType: Option[Row] => String = row =>
+      row
+        .map(row =>
+          if (row.schema == null) {
+            "struct<>"
+          } else {
+            s"${row.schema.catalogString}"
+          })
+        .getOrElse("struct<>")
+
+    s"""
+       |== Results ==
+       |${sideBySide(
+      s"== Correct Answer - ${expectedAnswer.size} ==" +:
+        getRowType(expectedAnswer.headOption) +:
+        prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
+      s"== Auron Answer - ${sparkAnswer.size} ==" +:
+        getRowType(sparkAnswer.headOption) +:
+        prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
+    """.stripMargin
+  }
+
+  def includesRows(expectedRows: Seq[Row], sparkAnswer: Seq[Row]): 
Option[String] = {
+    if (!prepareAnswer(expectedRows, true).toSet.subsetOf(
+        prepareAnswer(sparkAnswer, true).toSet)) {
+      return Some(genError(expectedRows, sparkAnswer, true))
+    }
+    None
+  }
+
+  private def compare(obj1: Any, obj2: Any): Boolean = (obj1, obj2) match {
+    case (null, null) => true
+    case (null, _) => false
+    case (_, null) => false
+    case (a: Array[_], b: Array[_]) =>
+      a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r) }
+    case (a: Map[_, _], b: Map[_, _]) =>
+      a.size == b.size && a.keys.forall { aKey =>
+        b.keys.find(bKey => compare(aKey, bKey)).exists(bKey => 
compare(a(aKey), b(bKey)))
+      }
+    case (a: Iterable[_], b: Iterable[_]) =>
+      a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r) }
+    case (a: Product, b: Product) =>
+      compare(a.productIterator.toSeq, b.productIterator.toSeq)
+    case (a: Row, b: Row) =>
+      compare(a.toSeq, b.toSeq)
+    // 0.0 == -0.0, turn float/double to bits before comparison, to 
distinguish 0.0 and -0.0.
+    case (a: Double, b: Double) =>
+      if ((isNaNOrInf(a) || isNaNOrInf(b)) || (a == -0.0) || (b == -0.0)) {
+        java.lang.Double.doubleToRawLongBits(a) == 
java.lang.Double.doubleToRawLongBits(b)
+      } else {
+        Precision.equalsWithRelativeTolerance(a, b, 0.00001d)
+      }
+    case (a: Float, b: Float) =>
+      java.lang.Float.floatToRawIntBits(a) == 
java.lang.Float.floatToRawIntBits(b)
+    case (a, b) => a == b
+  }
+
+  def isNaNOrInf(num: Double): Boolean = {
+    num.isNaN || num.isInfinite || num.isNegInfinity || num.isPosInfinity
+  }
+
+  def sameRows(
+      expectedAnswer: Seq[Row],
+      sparkAnswer: Seq[Row],
+      isSorted: Boolean = false): Option[String] = {
+    // modify method 'compare'
+    if (!compare(prepareAnswer(expectedAnswer, isSorted), 
prepareAnswer(sparkAnswer, isSorted))) {
+      return Some(genError(expectedAnswer, sparkAnswer, isSorted))
+    }
+    None
+  }
+
+  /**
+   * Runs the plan and makes sure the answer is within absTol of the expected 
result.
+   *
+   * @param actualAnswer
+   *   the actual result in a [[Row]].
+   * @param expectedAnswer
+   *   the expected result in a[[Row]].

Review Comment:
   Missing space in scaladoc reference: There should be a space between "a" and 
the scaladoc link "[[Row]]" to improve readability.
   ```suggestion
      *   the expected result in a [[Row]].
   ```



##########
auron-spark-tests/common/src/test/scala/org/apache/auron/utils/SparkTestSettings.scala:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+ * Test settings which can enable/disable some tests on demand(e.g. Auron has 
some correctness bug
+ * that not fixed yet).
+ */
+abstract class SparkTestSettings {
+  private val AURON_TEST: String = "Auron - "
+  private val enabledSuites: java.util.Map[String, SuiteSettings] = new 
util.HashMap()
+
+  protected def enableSuite[T: ClassTag]: SuiteSettings = {
+    enableSuite(implicitly[ClassTag[T]].runtimeClass.getCanonicalName)
+  }
+
+  protected def enableSuite(suiteName: String): SuiteSettings = {
+    if (enabledSuites.containsKey(suiteName)) {
+      throw new IllegalArgumentException("Duplicated suite name: " + suiteName)
+    }
+    val suiteSettings = new SuiteSettings
+    enabledSuites.put(suiteName, suiteSettings)
+    suiteSettings
+  }
+
+  private[utils] def shouldRun(suiteName: String, testName: String): Boolean = 
{
+    if (!enabledSuites.containsKey(suiteName)) {
+      return false
+    }
+
+    val suiteSettings = enabledSuites.get(suiteName)
+    suiteSettings.disableReason match {
+      case Some(_) => return false
+      case _ => // continue
+    }
+
+    val inclusion = suiteSettings.inclusion.asScala
+    val exclusion = suiteSettings.exclusion.asScala
+
+    if (inclusion.isEmpty && exclusion.isEmpty) {
+      // default to run all cases under this suite
+      return true
+    }
+
+    if (inclusion.nonEmpty && exclusion.nonEmpty) {
+      // error
+      throw new IllegalStateException(
+        s"Do not use include and exclude conditions on the same test case: 
$suiteName:$testName")
+    }
+
+    if (inclusion.nonEmpty) {
+      // include mode
+      val isIncluded = inclusion.exists(_.isIncluded(testName))
+      return isIncluded
+    }
+
+    if (exclusion.nonEmpty) {
+      // exclude mode
+      val isExcluded = exclusion.exists(_.isExcluded(testName))
+      return !isExcluded
+    }
+
+    throw new IllegalStateException("Unreachable code from shouldRun")
+  }
+
+  final protected class SuiteSettings {
+    private[utils] val inclusion: util.List[IncludeBase] = new util.ArrayList()
+    private[utils] val exclusion: util.List[ExcludeBase] = new util.ArrayList()
+
+    private[utils] var disableReason: Option[String] = None
+
+    def include(testNames: String*): SuiteSettings = {
+      inclusion.add(Include(testNames: _*))
+      this
+    }
+
+    def exclude(testNames: String*): SuiteSettings = {
+      exclusion.add(Exclude(testNames: _*))
+      this
+    }
+
+    def includeByPrefix(prefixes: String*): SuiteSettings = {
+      inclusion.add(IncludeByPrefix(prefixes: _*))
+      this
+    }
+    def excludeByPrefix(prefixes: String*): SuiteSettings = {
+      exclusion.add(ExcludeByPrefix(prefixes: _*))
+      this
+    }
+
+    def disable(reason: String): SuiteSettings = {
+      disableReason = disableReason match {
+        case Some(r) => throw new IllegalArgumentException("Disable reason 
already set: " + r)
+        case None => Some(reason)
+      }
+      this
+    }
+  }

Review Comment:
   Missing scaladoc for class: The `SuiteSettings` class is a key component of 
the test settings framework but lacks documentation explaining its purpose, how 
it should be used, and the relationship between inclusion, exclusion, and 
disable options.



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/utils/DebuggableThreadUtils.scala:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.utils
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.util.ThreadUtils
+
+object DebuggableThreadUtils {
+
+  /** Logs message for failure occurring during the execution of 
ThreadUtils.parmap. */

Review Comment:
   Missing parameter documentation: The scaladoc comment should document the 
parameters `in`, `prefix`, `maxThreads`, and `f`, as well as the return type, 
following standard Scala documentation conventions. This would improve code 
maintainability and help users understand what each parameter does.
   ```suggestion
     /**
      * Applies a function to each element of the input sequence in parallel, 
logging any failures.
      *
      * @param in         The input sequence of elements to process.
      * @param prefix     The prefix to use for thread names.
      * @param maxThreads The maximum number of threads to use for parallel 
processing.
      * @param f          The function to apply to each element of the input 
sequence.
      * @return           A sequence containing the results of applying the 
function to each input element.
      */
   ```



##########
auron-spark-tests/pom.xml:
##########
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.auron</groupId>
+    <artifactId>auron-parent_${scalaVersion}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>auron-spark-tests</artifactId>
+  <packaging>pom</packaging>
+  <name>Auron Spark Test Parent</name>
+
+  <modules>
+    <module>common</module>
+  </modules>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scalaVersion}</artifactId>
+      <version>${sparkVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+      <version>${sparkVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scalaVersion}</artifactId>
+      <version>${sparkVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scalaVersion}</artifactId>
+      <version>${sparkVersion}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>net.alchim31.maven</groupId>
+          <artifactId>scala-maven-plugin</artifactId>
+          <configuration>
+            <displayCmd>true</displayCmd>
+            <jvmArgs>
+              <jvmArg>-Xss128m</jvmArg>
+            </jvmArgs>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.scalastyle</groupId>
+          <artifactId>scalastyle-maven-plugin</artifactId>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>spark-3.3</id>
+      <modules>
+        <module>spark33</module>
+      </modules>
+    </profile>

Review Comment:
   Profile activation pattern inconsistency: The spark33 module is only 
included when the `spark-3.3` profile is active, but the parent pom.xml uses 
profile `spark-tests` to include the entire auron-spark-tests module. This 
creates a two-level profile activation requirement that might be confusing. 
Consider documenting the required profile combinations or simplifying the 
activation logic.



##########
auron-spark-tests/spark33/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.utils
+
+import org.apache.spark.sql._
+
+class AuronSparkTestSettings extends SparkTestSettings {
+  {
+    // Use Arrow's unsafe implementation.
+    System.setProperty("arrow.allocation.manager.type", "Unsafe")
+  }
+
+  enableSuite[AuronStringFunctionsSuite]
+    // See https://github.com/apache/auron/issues/1724
+    .exclude("string / binary substring function")
+
+  // Will be implemented in the future.
+  override def getSQLQueryTestSettings = new SQLQueryTestSettings {
+    override def getResourceFilePath: String = ???
+
+    override def getSupportedSQLQueryTests: Set[String] = ???
+
+    override def getOverwriteSQLQueryTests: Set[String] = ???
+  }

Review Comment:
   Unimplemented methods with ??? operator: The `getSQLQueryTestSettings` 
method returns a SQLQueryTestSettings instance with all methods throwing 
NotImplementedError (via ???). While the comment states "Will be implemented in 
the future", this creates a risk if any code tries to use these methods. 
Consider either returning None/null for optional functionality, or adding 
runtime checks to prevent accidental usage until implementation is complete.
   ```suggestion
     override def getSQLQueryTestSettings = throw new 
NotImplementedError("getSQLQueryTestSettings is not implemented yet")
   ```



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkQueryTestsBase.scala:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.execution.SQLExecution
+import org.scalatest.Assertions
+
+/**
+ * Basic trait for all Spark query tests.
+ */
+trait SparkQueryTestsBase extends QueryTest with SparkTestsSharedSessionBase {
+  def prepareWorkDir(): Unit = {
+    // prepare working paths
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+  }

Review Comment:
   Missing scaladoc for public method: The `prepareWorkDir` method is public 
but lacks documentation explaining what it does, when it should be called, and 
any side effects (like deleting existing directories). This is especially 
important since it performs destructive operations like force-deleting 
directories.



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkTestsBase.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+import org.scalatest.funsuite.AnyFunSuiteLike
+
+import org.apache.auron.utils.SparkTestSettings
+
+/**
+ * Base trait for all Spark tests.
+ */
+trait SparkTestsBase extends AnyFunSuiteLike {
+  protected val IGNORE_ALL: String = "IGNORE_ALL"
+  protected val AURON_TEST: String = "Auron - "
+
+  protected val rootPath: String = getClass.getResource("/").getPath
+  protected val basePath: String = rootPath + "unit-tests-working-home"
+  protected val warehouse: String = basePath + "/spark-warehouse"
+  protected val metaStorePathAbsolute: String = basePath + "/meta"
+
+  // The blacklist is taken in highest priority. Tests on the
+  // list will never be run with no regard to backend test settings.

Review Comment:
   Missing documentation for blacklist feature: The `testNameBlackList` method 
lacks documentation explaining how the blacklist works, particularly the 
special behavior of the "IGNORE_ALL" value that disables all tests. This is an 
important feature that should be documented for users extending this trait.
   ```suggestion
     /**
      * Returns a sequence of test names to be blacklisted (i.e., skipped) for 
this test suite.
      *
      * Any test whose name appears in the returned sequence will never be run, 
regardless of backend
      * test settings. This method can be overridden by subclasses to specify 
which tests to skip.
      *
      * Special behavior: If the sequence contains the value of `IGNORE_ALL` 
(case-insensitive),
      * then all tests in the suite will be skipped.
      *
      * @return a sequence of test names to blacklist, or a sequence containing 
`IGNORE_ALL` to skip all tests
      */
   ```



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkTestsSharedSessionBase.scala:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+import org.apache.spark.sql.test.SharedSparkSession
+
+trait SparkTestsSharedSessionBase extends SharedSparkSession with 
SparkTestsBase {
+  override def sparkConf: SparkConf = {
+    val conf = super.sparkConf
+      .setAppName("Auron-UT")
+      .set("spark.sql.warehouse.dir", warehouse)
+
+    for ((key, value) <- sparkConfList) {
+      conf.set(key, value)
+    }
+
+    conf
+  }
+
+  /**
+   * Get all the children plan of plans.
+   *
+   * @param plans
+   *   the input plans.
+   * @return
+   */
+  private def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
+    if (plans.isEmpty) {
+      return Seq()
+    }
+
+    val inputPlans: Seq[SparkPlan] = plans.map {
+      case stage: QueryStageExec => stage.plan
+      case plan => plan
+    }
+
+    var newChildren: Seq[SparkPlan] = Seq()
+    inputPlans.foreach { plan =>
+      newChildren = newChildren ++ getChildrenPlan(plan.children)
+      // To avoid duplication of WholeStageCodegenXXX and its children.
+      if (!plan.nodeName.startsWith("WholeStageCodegen")) {
+        newChildren = newChildren :+ plan
+      }
+    }
+    newChildren
+  }

Review Comment:
   Missing scaladoc for private method: While the method `getChildrenPlan` is 
private, it has parameter documentation in comments but lacks a proper scaladoc 
block explaining its purpose, especially the special handling of 
WholeStageCodegen nodes and QueryStageExec.



##########
.github/workflows/tpcds-reusable.yml:
##########
@@ -183,6 +183,11 @@ jobs:
             CMD="$CMD --uniffle $UNIFFLE_NUMBER"
           fi
 
+          SPARK_NUMBER="${{ inputs.sparktests }}"
+          if [[ "$SPARK_TESTS" == "true" ]]; then
+            CMD="$CMD --sparktests true"
+          fi

Review Comment:
   Multiple critical issues with sparktests handling: 1) The workflow 
references `inputs.sparktests` but there is no corresponding input definition 
in the workflow_call inputs section (around line 22). 2) Line 186 reuses the 
variable name `SPARK_NUMBER` which was already assigned on line 165 with a 
different value (the Spark version number), overwriting it. This should use a 
different variable name like `SPARK_TESTS`. 3) Line 187 references the 
undefined variable `SPARK_TESTS` instead of `SPARK_NUMBER`.



##########
pom.xml:
##########
@@ -217,13 +229,24 @@
         <artifactId>scalatest_${scalaVersion}</artifactId>
         <version>${scalaTestVersion}</version>
       </dependency>
+      <dependency>
+        <groupId>org.scalatestplus</groupId>
+        <artifactId>scalatestplus-scalacheck_${scalaVersion}</artifactId>
+        <version>3.1.0.0-RC2</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_${scalaVersion}</artifactId>
         <version>${sparkVersion}</version>
         <type>test-jar</type>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+        <version>${sparkVersion}</version>
+      </dependency>

Review Comment:
   Duplicate dependency declaration: `spark-catalyst` dependency (without 
test-jar type) is being added here but likely already exists in the 
dependencyManagement section before this addition. This creates two entries for 
the same artifact with different configurations (one without type specification 
on lines 245-249, and one with type=test-jar on lines 250-256), which could 
lead to confusion about which version/scope is actually used.



##########
pom.xml:
##########
@@ -217,13 +229,24 @@
         <artifactId>scalatest_${scalaVersion}</artifactId>
         <version>${scalaTestVersion}</version>
       </dependency>
+      <dependency>
+        <groupId>org.scalatestplus</groupId>
+        <artifactId>scalatestplus-scalacheck_${scalaVersion}</artifactId>
+        <version>3.1.0.0-RC2</version>

Review Comment:
   RC (Release Candidate) version in production: The dependency 
`scalatestplus-scalacheck` is using version `3.1.0.0-RC2`, which is a release 
candidate version. For production code and test infrastructure, it's better to 
use stable release versions rather than RC versions, as they may have bugs or 
compatibility issues that were fixed in the final release.
   ```suggestion
           <version>3.1.0.0</version>
   ```



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkExpressionTestsBase.scala:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, 
ConvertToLocalRelation, NullPropagation}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
MapData, TypeUtils}
+import org.apache.spark.sql.execution.auron.plan.NativeProjectBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.scalactic.TripleEqualsSupport.Spread
+
+/**
+ * Base trait for all Spark expression tests.
+ */
+trait SparkExpressionTestsBase
+    extends SparkFunSuite
+    with ExpressionEvalHelper
+    with SparkTestsBase {
+  val SUPPORTED_DATA_TYPES = TypeCollection(
+    BooleanType,
+    ByteType,
+    ShortType,
+    IntegerType,
+    LongType,
+    FloatType,
+    DoubleType,
+    DecimalType,
+    StringType,
+    BinaryType,
+    DateType,
+    TimestampType,
+    ArrayType,
+    StructType,
+    MapType)
+
+  override def beforeAll(): Unit = {
+    // Prepare working paths.
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+
+    super.beforeAll()
+    initializeSession()
+    _spark.sparkContext.setLogLevel("WARN")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
+    }
+  }
+
+  protected def initializeSession(): Unit = {
+    if (_spark == null) {
+      val sparkBuilder = SparkSession
+        .builder()
+        .appName("Auron-UT")
+        .master(s"local[2]")
+        // Avoid static evaluation for literal input by spark catalyst.
+        .config(
+          SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
+          ConvertToLocalRelation.ruleName +
+            "," + ConstantFolding.ruleName + "," + NullPropagation.ruleName)
+
+      for ((key, value) <- sparkConfList) {
+        sparkBuilder.config(key, value)
+      }
+
+      _spark = sparkBuilder
+        .getOrCreate()
+    }
+  }
+
+  protected var _spark: SparkSession = null
+
+  override protected def checkEvaluation(
+      expression: => Expression,
+      expected: Any,
+      inputRow: InternalRow = EmptyRow): Unit = {
+
+    if (canConvertToDataFrame(inputRow)) {
+      val resolver = ResolveTimeZone
+      val expr = resolver.resolveTimeZones(expression)
+      assert(expr.resolved)
+
+      auronCheckExpression(expr, expected, inputRow)
+    } else {
+      logWarning(
+        "Skipping evaluation - Nonempty inputRow cannot be converted to 
DataFrame " +
+          "due to complex/unsupported types.\n")
+    }
+  }
+
+  def auronCheckExpression(expression: Expression, expected: Any, inputRow: 
InternalRow): Unit = {
+    val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
+      convertInternalRowToDataFrame(inputRow)
+    } else {
+      val schema = StructType(StructField("a", IntegerType, nullable = true) 
:: Nil)
+      val empData = Seq(Row(1))
+      _spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
+    }
+    val resultDF = df.select(Column(expression))
+    val result = resultDF.collect()
+
+    if (checkDataTypeSupported(expression) &&
+      expression.children.forall(checkDataTypeSupported)) {
+      val projectExec = resultDF.queryExecution.executedPlan.collect {
+        case p: NativeProjectBase => p
+      }
+
+      if (projectExec.size == 1) {
+        logInfo("Offload to native backend in the test.\n")
+      } else {
+        logInfo("Not supported in Auron, fall back to vanilla spark in the 
test.\n")
+        shouldNotFallback()
+      }
+    } else {
+      logInfo("Has unsupported data type, fall back to vanilla spark.\n")
+      shouldNotFallback()
+    }
+
+    if (!(checkResult(result.head.get(0), expected, expression.dataType, 
expression.nullable)
+        || checkResult(
+          
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
+            result.head.get(0)
+          ), // decimal precision is wrong from value
+          CatalystTypeConverters.convertToCatalyst(expected),
+          expression.dataType,
+          expression.nullable))) {
+      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+      fail(
+        s"Incorrect evaluation: $expression, " +
+          s"actual: ${result.head.get(0)}, " +
+          s"expected: $expected$input")
+    }
+  }
+
+  /**
+   * Sort map data by key and return the sorted key array and value array.
+   *
+   * @param input
+   *   input map data.
+   * @param kt
+   *   key type.
+   * @param vt
+   *   value type.
+   * @return
+   *   the sorted key array and value array.
+   */
+  private def getSortedArrays(
+      input: MapData,
+      kt: DataType,
+      vt: DataType): (ArrayData, ArrayData) = {
+    val keyArray = input.keyArray().toArray[Any](kt)
+    val valueArray = input.valueArray().toArray[Any](vt)
+    val newMap = (keyArray.zip(valueArray)).toMap
+    val sortedMap = mutable.SortedMap(newMap.toSeq: 
_*)(TypeUtils.getInterpretedOrdering(kt))
+    (new GenericArrayData(sortedMap.keys.toArray), new 
GenericArrayData(sortedMap.values.toArray))
+  }
+
+  def isNaNOrInf(num: Double): Boolean = {
+    num.isNaN || num.isInfinite
+  }
+
+  override protected def checkResult(
+      result: Any,
+      expected: Any,
+      exprDataType: DataType,
+      exprNullable: Boolean): Boolean = {
+    val dataType = UserDefinedType.sqlType(exprDataType)
+
+    // The result is null for a non-nullable expression
+    assert(result != null || exprNullable, "exprNullable should be true if 
result is null")
+    (result, expected) match {
+      case (result: Array[Byte], expected: Array[Byte]) =>
+        java.util.Arrays.equals(result, expected)
+      case (result: Double, expected: Spread[Double @unchecked]) =>
+        expected.asInstanceOf[Spread[Double]].isWithin(result)
+      case (result: InternalRow, expected: InternalRow) =>
+        val st = dataType.asInstanceOf[StructType]
+        assert(result.numFields == st.length && expected.numFields == 
st.length)
+        st.zipWithIndex.forall { case (f, i) =>
+          checkResult(
+            result.get(i, f.dataType),
+            expected.get(i, f.dataType),
+            f.dataType,
+            f.nullable)
+        }
+      case (result: ArrayData, expected: ArrayData) =>
+        result.numElements == expected.numElements && {
+          val ArrayType(et, cn) = dataType.asInstanceOf[ArrayType]
+          var isSame = true
+          var i = 0
+          while (isSame && i < result.numElements) {
+            isSame = checkResult(result.get(i, et), expected.get(i, et), et, 
cn)
+            i += 1
+          }
+          isSame
+        }
+      case (result: MapData, expected: MapData) =>
+        val MapType(kt, vt, vcn) = dataType.asInstanceOf[MapType]
+        checkResult(
+          getSortedArrays(result, kt, vt)._1,
+          getSortedArrays(expected, kt, vt)._1,
+          ArrayType(kt, containsNull = false),
+          exprNullable = false) && checkResult(
+          getSortedArrays(result, kt, vt)._2,
+          getSortedArrays(expected, kt, vt)._2,
+          ArrayType(vt, vcn),
+          exprNullable = false)
+      case (result: Double, expected: Double) =>
+        if ((isNaNOrInf(result) || isNaNOrInf(expected))
+          || (result == -0.0) || (expected == -0.0)) {
+          java.lang.Double.doubleToRawLongBits(result) ==
+            java.lang.Double.doubleToRawLongBits(expected)
+        } else {
+          Precision.equalsWithRelativeTolerance(result, expected, 0.00001d)
+        }
+      case (result: Float, expected: Float) =>
+        if (expected.isNaN) result.isNaN else expected == result
+      case (result: Row, expected: InternalRow) => result.toSeq == 
expected.toSeq(result.schema)
+      case _ =>
+        result == expected
+    }
+  }
+
+  def checkDataTypeSupported(expr: Expression): Boolean = {
+    SUPPORTED_DATA_TYPES.acceptsType(expr.dataType)
+  }
+
+  def shouldNotFallback(): Unit = {
+    // Do nothing for now since Auron has a lot of fallbacks.
+  }
+
+  def canConvertToDataFrame(inputRow: InternalRow): Boolean = {
+    if (inputRow == EmptyRow || inputRow == InternalRow.empty) {
+      return true
+    }
+
+    if (!inputRow.isInstanceOf[GenericInternalRow]) {
+      return false
+    }
+
+    val values = inputRow.asInstanceOf[GenericInternalRow].values
+    for (value <- values) {
+      value match {
+        case _: MapData => return false
+        case _: ArrayData => return false
+        case _: InternalRow => return false
+        case _ =>
+      }
+    }
+    true
+  }

Review Comment:
   Missing documentation for public method: The `canConvertToDataFrame` method 
is public but lacks scaladoc explaining what types of InternalRow can be 
converted to DataFrame and why certain types (MapData, ArrayData, InternalRow) 
cannot be converted.



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkExpressionTestsBase.scala:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, 
ConvertToLocalRelation, NullPropagation}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
MapData, TypeUtils}
+import org.apache.spark.sql.execution.auron.plan.NativeProjectBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.scalactic.TripleEqualsSupport.Spread
+
+/**
+ * Base trait for all Spark expression tests.
+ */
+trait SparkExpressionTestsBase
+    extends SparkFunSuite
+    with ExpressionEvalHelper
+    with SparkTestsBase {
+  val SUPPORTED_DATA_TYPES = TypeCollection(
+    BooleanType,
+    ByteType,
+    ShortType,
+    IntegerType,
+    LongType,
+    FloatType,
+    DoubleType,
+    DecimalType,
+    StringType,
+    BinaryType,
+    DateType,
+    TimestampType,
+    ArrayType,
+    StructType,
+    MapType)
+
+  override def beforeAll(): Unit = {
+    // Prepare working paths.
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+
+    super.beforeAll()
+    initializeSession()
+    _spark.sparkContext.setLogLevel("WARN")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
+    }
+  }
+
+  protected def initializeSession(): Unit = {
+    if (_spark == null) {
+      val sparkBuilder = SparkSession
+        .builder()
+        .appName("Auron-UT")
+        .master(s"local[2]")
+        // Avoid static evaluation for literal input by spark catalyst.
+        .config(
+          SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
+          ConvertToLocalRelation.ruleName +
+            "," + ConstantFolding.ruleName + "," + NullPropagation.ruleName)
+
+      for ((key, value) <- sparkConfList) {
+        sparkBuilder.config(key, value)
+      }
+
+      _spark = sparkBuilder
+        .getOrCreate()
+    }
+  }
+
+  protected var _spark: SparkSession = null
+
+  override protected def checkEvaluation(
+      expression: => Expression,
+      expected: Any,
+      inputRow: InternalRow = EmptyRow): Unit = {
+
+    if (canConvertToDataFrame(inputRow)) {
+      val resolver = ResolveTimeZone
+      val expr = resolver.resolveTimeZones(expression)
+      assert(expr.resolved)
+
+      auronCheckExpression(expr, expected, inputRow)
+    } else {
+      logWarning(
+        "Skipping evaluation - Nonempty inputRow cannot be converted to 
DataFrame " +
+          "due to complex/unsupported types.\n")
+    }
+  }
+
+  def auronCheckExpression(expression: Expression, expected: Any, inputRow: 
InternalRow): Unit = {
+    val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
+      convertInternalRowToDataFrame(inputRow)
+    } else {
+      val schema = StructType(StructField("a", IntegerType, nullable = true) 
:: Nil)
+      val empData = Seq(Row(1))
+      _spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
+    }
+    val resultDF = df.select(Column(expression))
+    val result = resultDF.collect()
+
+    if (checkDataTypeSupported(expression) &&
+      expression.children.forall(checkDataTypeSupported)) {
+      val projectExec = resultDF.queryExecution.executedPlan.collect {
+        case p: NativeProjectBase => p
+      }
+
+      if (projectExec.size == 1) {
+        logInfo("Offload to native backend in the test.\n")
+      } else {
+        logInfo("Not supported in Auron, fall back to vanilla spark in the 
test.\n")
+        shouldNotFallback()
+      }
+    } else {
+      logInfo("Has unsupported data type, fall back to vanilla spark.\n")
+      shouldNotFallback()
+    }
+
+    if (!(checkResult(result.head.get(0), expected, expression.dataType, 
expression.nullable)
+        || checkResult(
+          
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
+            result.head.get(0)
+          ), // decimal precision is wrong from value
+          CatalystTypeConverters.convertToCatalyst(expected),
+          expression.dataType,
+          expression.nullable))) {
+      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+      fail(
+        s"Incorrect evaluation: $expression, " +
+          s"actual: ${result.head.get(0)}, " +
+          s"expected: $expected$input")
+    }
+  }
+
+  /**
+   * Sort map data by key and return the sorted key array and value array.
+   *
+   * @param input
+   *   input map data.
+   * @param kt
+   *   key type.
+   * @param vt
+   *   value type.
+   * @return
+   *   the sorted key array and value array.
+   */
+  private def getSortedArrays(
+      input: MapData,
+      kt: DataType,
+      vt: DataType): (ArrayData, ArrayData) = {
+    val keyArray = input.keyArray().toArray[Any](kt)
+    val valueArray = input.valueArray().toArray[Any](vt)
+    val newMap = (keyArray.zip(valueArray)).toMap
+    val sortedMap = mutable.SortedMap(newMap.toSeq: 
_*)(TypeUtils.getInterpretedOrdering(kt))
+    (new GenericArrayData(sortedMap.keys.toArray), new 
GenericArrayData(sortedMap.values.toArray))
+  }
+
+  def isNaNOrInf(num: Double): Boolean = {
+    num.isNaN || num.isInfinite
+  }
+
+  override protected def checkResult(
+      result: Any,
+      expected: Any,
+      exprDataType: DataType,
+      exprNullable: Boolean): Boolean = {
+    val dataType = UserDefinedType.sqlType(exprDataType)
+
+    // The result is null for a non-nullable expression
+    assert(result != null || exprNullable, "exprNullable should be true if 
result is null")
+    (result, expected) match {
+      case (result: Array[Byte], expected: Array[Byte]) =>
+        java.util.Arrays.equals(result, expected)
+      case (result: Double, expected: Spread[Double @unchecked]) =>
+        expected.asInstanceOf[Spread[Double]].isWithin(result)
+      case (result: InternalRow, expected: InternalRow) =>
+        val st = dataType.asInstanceOf[StructType]
+        assert(result.numFields == st.length && expected.numFields == 
st.length)
+        st.zipWithIndex.forall { case (f, i) =>
+          checkResult(
+            result.get(i, f.dataType),
+            expected.get(i, f.dataType),
+            f.dataType,
+            f.nullable)
+        }
+      case (result: ArrayData, expected: ArrayData) =>
+        result.numElements == expected.numElements && {
+          val ArrayType(et, cn) = dataType.asInstanceOf[ArrayType]
+          var isSame = true
+          var i = 0
+          while (isSame && i < result.numElements) {
+            isSame = checkResult(result.get(i, et), expected.get(i, et), et, 
cn)
+            i += 1
+          }
+          isSame
+        }
+      case (result: MapData, expected: MapData) =>
+        val MapType(kt, vt, vcn) = dataType.asInstanceOf[MapType]
+        checkResult(
+          getSortedArrays(result, kt, vt)._1,
+          getSortedArrays(expected, kt, vt)._1,
+          ArrayType(kt, containsNull = false),
+          exprNullable = false) && checkResult(
+          getSortedArrays(result, kt, vt)._2,
+          getSortedArrays(expected, kt, vt)._2,
+          ArrayType(vt, vcn),
+          exprNullable = false)
+      case (result: Double, expected: Double) =>
+        if ((isNaNOrInf(result) || isNaNOrInf(expected))
+          || (result == -0.0) || (expected == -0.0)) {
+          java.lang.Double.doubleToRawLongBits(result) ==
+            java.lang.Double.doubleToRawLongBits(expected)
+        } else {
+          Precision.equalsWithRelativeTolerance(result, expected, 0.00001d)
+        }
+      case (result: Float, expected: Float) =>
+        if (expected.isNaN) result.isNaN else expected == result
+      case (result: Row, expected: InternalRow) => result.toSeq == 
expected.toSeq(result.schema)
+      case _ =>
+        result == expected
+    }
+  }
+
+  def checkDataTypeSupported(expr: Expression): Boolean = {
+    SUPPORTED_DATA_TYPES.acceptsType(expr.dataType)
+  }
+
+  def shouldNotFallback(): Unit = {
+    // Do nothing for now since Auron has a lot of fallbacks.

Review Comment:
   Placeholder method needs documentation: The `shouldNotFallback` method is 
intentionally empty ("Do nothing for now since Auron has a lot of fallbacks"), 
but it's unclear what the intended behavior should be in the future or when 
this should be implemented. Consider adding a TODO comment or more detailed 
explanation about the future implementation plan.
   ```suggestion
     /**
      * Placeholder for future fallback checks.
      *
      * TODO: Implement logic to verify that no unexpected fallbacks occur 
during expression evaluation.
      * Currently, this method is intentionally left empty because the Auron 
engine has many
      * legitimate fallback cases that are not yet fully handled. Once fallback 
handling is
      * stabilized and the expected cases are well defined, implement 
assertions or checks here
      * to ensure that only allowed fallbacks occur.
      */
     def shouldNotFallback(): Unit = {
       // Intentionally left blank for now.
   ```



##########
auron-spark-tests/common/src/test/scala/org/apache/spark/sql/SparkExpressionTestsBase.scala:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.math3.util.Precision
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, 
ConvertToLocalRelation, NullPropagation}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
MapData, TypeUtils}
+import org.apache.spark.sql.execution.auron.plan.NativeProjectBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.scalactic.TripleEqualsSupport.Spread
+
+/**
+ * Base trait for all Spark expression tests.
+ */
+trait SparkExpressionTestsBase
+    extends SparkFunSuite
+    with ExpressionEvalHelper
+    with SparkTestsBase {
+  val SUPPORTED_DATA_TYPES = TypeCollection(
+    BooleanType,
+    ByteType,
+    ShortType,
+    IntegerType,
+    LongType,
+    FloatType,
+    DoubleType,
+    DecimalType,
+    StringType,
+    BinaryType,
+    DateType,
+    TimestampType,
+    ArrayType,
+    StructType,
+    MapType)
+
+  override def beforeAll(): Unit = {
+    // Prepare working paths.
+    val basePathDir = new File(basePath)
+    if (basePathDir.exists()) {
+      FileUtils.forceDelete(basePathDir)
+    }
+    FileUtils.forceMkdir(basePathDir)
+    FileUtils.forceMkdir(new File(warehouse))
+    FileUtils.forceMkdir(new File(metaStorePathAbsolute))
+
+    super.beforeAll()
+    initializeSession()
+    _spark.sparkContext.setLogLevel("WARN")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
+    }
+  }
+
+  protected def initializeSession(): Unit = {
+    if (_spark == null) {
+      val sparkBuilder = SparkSession
+        .builder()
+        .appName("Auron-UT")
+        .master(s"local[2]")
+        // Avoid static evaluation for literal input by spark catalyst.
+        .config(
+          SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
+          ConvertToLocalRelation.ruleName +
+            "," + ConstantFolding.ruleName + "," + NullPropagation.ruleName)
+
+      for ((key, value) <- sparkConfList) {
+        sparkBuilder.config(key, value)
+      }
+
+      _spark = sparkBuilder
+        .getOrCreate()
+    }
+  }
+
+  protected var _spark: SparkSession = null
+
+  override protected def checkEvaluation(
+      expression: => Expression,
+      expected: Any,
+      inputRow: InternalRow = EmptyRow): Unit = {
+
+    if (canConvertToDataFrame(inputRow)) {
+      val resolver = ResolveTimeZone
+      val expr = resolver.resolveTimeZones(expression)
+      assert(expr.resolved)
+
+      auronCheckExpression(expr, expected, inputRow)
+    } else {
+      logWarning(
+        "Skipping evaluation - Nonempty inputRow cannot be converted to 
DataFrame " +
+          "due to complex/unsupported types.\n")
+    }
+  }
+
+  def auronCheckExpression(expression: Expression, expected: Any, inputRow: 
InternalRow): Unit = {
+    val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
+      convertInternalRowToDataFrame(inputRow)
+    } else {
+      val schema = StructType(StructField("a", IntegerType, nullable = true) 
:: Nil)
+      val empData = Seq(Row(1))
+      _spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
+    }
+    val resultDF = df.select(Column(expression))
+    val result = resultDF.collect()
+
+    if (checkDataTypeSupported(expression) &&
+      expression.children.forall(checkDataTypeSupported)) {
+      val projectExec = resultDF.queryExecution.executedPlan.collect {
+        case p: NativeProjectBase => p
+      }
+
+      if (projectExec.size == 1) {
+        logInfo("Offload to native backend in the test.\n")
+      } else {
+        logInfo("Not supported in Auron, fall back to vanilla spark in the 
test.\n")
+        shouldNotFallback()
+      }
+    } else {
+      logInfo("Has unsupported data type, fall back to vanilla spark.\n")
+      shouldNotFallback()
+    }
+
+    if (!(checkResult(result.head.get(0), expected, expression.dataType, 
expression.nullable)
+        || checkResult(
+          
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
+            result.head.get(0)
+          ), // decimal precision is wrong from value
+          CatalystTypeConverters.convertToCatalyst(expected),
+          expression.dataType,
+          expression.nullable))) {
+      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+      fail(
+        s"Incorrect evaluation: $expression, " +
+          s"actual: ${result.head.get(0)}, " +
+          s"expected: $expected$input")
+    }
+  }
+
+  /**
+   * Sort map data by key and return the sorted key array and value array.
+   *
+   * @param input
+   *   input map data.
+   * @param kt
+   *   key type.
+   * @param vt
+   *   value type.
+   * @return
+   *   the sorted key array and value array.
+   */
+  private def getSortedArrays(
+      input: MapData,
+      kt: DataType,
+      vt: DataType): (ArrayData, ArrayData) = {
+    val keyArray = input.keyArray().toArray[Any](kt)
+    val valueArray = input.valueArray().toArray[Any](vt)
+    val newMap = (keyArray.zip(valueArray)).toMap
+    val sortedMap = mutable.SortedMap(newMap.toSeq: 
_*)(TypeUtils.getInterpretedOrdering(kt))
+    (new GenericArrayData(sortedMap.keys.toArray), new 
GenericArrayData(sortedMap.values.toArray))
+  }
+
+  def isNaNOrInf(num: Double): Boolean = {
+    num.isNaN || num.isInfinite
+  }
+
+  override protected def checkResult(
+      result: Any,
+      expected: Any,
+      exprDataType: DataType,
+      exprNullable: Boolean): Boolean = {
+    val dataType = UserDefinedType.sqlType(exprDataType)
+
+    // The result is null for a non-nullable expression
+    assert(result != null || exprNullable, "exprNullable should be true if 
result is null")
+    (result, expected) match {
+      case (result: Array[Byte], expected: Array[Byte]) =>
+        java.util.Arrays.equals(result, expected)
+      case (result: Double, expected: Spread[Double @unchecked]) =>
+        expected.asInstanceOf[Spread[Double]].isWithin(result)
+      case (result: InternalRow, expected: InternalRow) =>
+        val st = dataType.asInstanceOf[StructType]
+        assert(result.numFields == st.length && expected.numFields == 
st.length)
+        st.zipWithIndex.forall { case (f, i) =>
+          checkResult(
+            result.get(i, f.dataType),
+            expected.get(i, f.dataType),
+            f.dataType,
+            f.nullable)
+        }
+      case (result: ArrayData, expected: ArrayData) =>
+        result.numElements == expected.numElements && {
+          val ArrayType(et, cn) = dataType.asInstanceOf[ArrayType]
+          var isSame = true
+          var i = 0
+          while (isSame && i < result.numElements) {
+            isSame = checkResult(result.get(i, et), expected.get(i, et), et, 
cn)
+            i += 1
+          }
+          isSame
+        }
+      case (result: MapData, expected: MapData) =>
+        val MapType(kt, vt, vcn) = dataType.asInstanceOf[MapType]
+        checkResult(
+          getSortedArrays(result, kt, vt)._1,
+          getSortedArrays(expected, kt, vt)._1,
+          ArrayType(kt, containsNull = false),
+          exprNullable = false) && checkResult(
+          getSortedArrays(result, kt, vt)._2,
+          getSortedArrays(expected, kt, vt)._2,
+          ArrayType(vt, vcn),
+          exprNullable = false)
+      case (result: Double, expected: Double) =>
+        if ((isNaNOrInf(result) || isNaNOrInf(expected))
+          || (result == -0.0) || (expected == -0.0)) {
+          java.lang.Double.doubleToRawLongBits(result) ==
+            java.lang.Double.doubleToRawLongBits(expected)
+        } else {
+          Precision.equalsWithRelativeTolerance(result, expected, 0.00001d)
+        }
+      case (result: Float, expected: Float) =>
+        if (expected.isNaN) result.isNaN else expected == result
+      case (result: Row, expected: InternalRow) => result.toSeq == 
expected.toSeq(result.schema)
+      case _ =>
+        result == expected
+    }
+  }
+
+  def checkDataTypeSupported(expr: Expression): Boolean = {
+    SUPPORTED_DATA_TYPES.acceptsType(expr.dataType)
+  }
+
+  def shouldNotFallback(): Unit = {
+    // Do nothing for now since Auron has a lot of fallbacks.
+  }
+
+  def canConvertToDataFrame(inputRow: InternalRow): Boolean = {
+    if (inputRow == EmptyRow || inputRow == InternalRow.empty) {
+      return true
+    }
+
+    if (!inputRow.isInstanceOf[GenericInternalRow]) {
+      return false
+    }
+
+    val values = inputRow.asInstanceOf[GenericInternalRow].values
+    for (value <- values) {
+      value match {
+        case _: MapData => return false
+        case _: ArrayData => return false
+        case _: InternalRow => return false
+        case _ =>
+      }
+    }
+    true
+  }
+
+  def convertInternalRowToDataFrame(inputRow: InternalRow): DataFrame = {
+    val structFieldSeq = new ArrayBuffer[StructField]()
+    val values = inputRow match {
+      case genericInternalRow: GenericInternalRow =>
+        genericInternalRow.values
+      case _ => throw new UnsupportedOperationException("Unsupported 
InternalRow.")
+    }
+
+    values.foreach {
+      case boolean: java.lang.Boolean =>
+        structFieldSeq.append(StructField("bool", BooleanType, boolean == 
null))
+      case byte: java.lang.Byte =>
+        structFieldSeq.append(StructField("i8", ByteType, byte == null))
+      case short: java.lang.Short =>
+        structFieldSeq.append(StructField("i16", ShortType, short == null))
+      case integer: java.lang.Integer =>
+        structFieldSeq.append(StructField("i32", IntegerType, integer == null))
+      case long: java.lang.Long =>
+        structFieldSeq.append(StructField("i64", LongType, long == null))
+      case float: java.lang.Float =>
+        structFieldSeq.append(StructField("fp32", FloatType, float == null))
+      case double: java.lang.Double =>
+        structFieldSeq.append(StructField("fp64", DoubleType, double == null))
+      case utf8String: UTF8String =>
+        structFieldSeq.append(StructField("str", StringType, utf8String == 
null))
+      case byteArr: Array[Byte] =>
+        structFieldSeq.append(StructField("vbin", BinaryType, byteArr == null))
+      case decimal: Decimal =>
+        structFieldSeq.append(
+          StructField("dec", DecimalType(decimal.precision, decimal.scale), 
decimal == null))
+      case _ =>
+        // for null
+        structFieldSeq.append(StructField("n", IntegerType, nullable = true))
+    }
+
+    _spark.internalCreateDataFrame(
+      _spark.sparkContext.parallelize(Seq(inputRow)),
+      StructType(structFieldSeq.toSeq))
+  }

Review Comment:
   Missing documentation for public method: The `convertInternalRowToDataFrame` 
method is public but lacks scaladoc explaining its purpose, the structure of 
the resulting DataFrame, and the limitations (as indicated by the 
UnsupportedOperationException for non-GenericInternalRow types).



##########
auron-spark-tests/spark33/pom.xml:
##########
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.auron</groupId>
+    <artifactId>auron-spark-tests</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>auron-spark-tests-spark33</artifactId>
+  <packaging>jar</packaging>
+  <name>Auron Spark Test for Spark 3.3</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension-shims-spark_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>auron-spark-tests-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>compile</scope>

Review Comment:
   Unusual scope for test-jar dependency: The `auron-spark-tests-common` 
dependency uses `type=test-jar` but `scope=compile` instead of the typical 
`scope=test`. This is unusual because test-jars are typically used in test 
scope. While this might be intentional to make test utilities available at 
compile time, it should be documented why this non-standard configuration is 
needed.
   ```suggestion
         <scope>test</scope>
   ```



##########
auron-spark-tests/spark33/pom.xml:
##########
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.auron</groupId>
+    <artifactId>auron-spark-tests</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>auron-spark-tests-spark33</artifactId>
+  <packaging>jar</packaging>
+  <name>Auron Spark Test for Spark 3.3</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension-shims-spark_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>auron-spark-tests-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.bytebuddy</groupId>
+      <artifactId>byte-buddy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>net.bytebuddy</groupId>
+      <artifactId>byte-buddy-agent</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-unsafe</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatestplus</groupId>
+      <artifactId>scalatestplus-scalacheck_${scalaVersion}</artifactId>
+      <version>3.1.0.0-RC2</version>

Review Comment:
   Hardcoded version instead of dependency management: The 
`scalatestplus-scalacheck` dependency version `3.1.0.0-RC2` is hardcoded here, 
which duplicates the version already defined in the root pom.xml (lines 
232-237). This should inherit from dependencyManagement without specifying the 
version to ensure consistency and easier version updates across the project.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to