This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit ed5b65e183765135b9f25435b57e9689caba986a
Author: Chang chen <[email protected]>
AuthorDate: Thu Jan 8 09:51:41 2026 +0800

    Spark: Initial support for 4.1.0 UT
    
    ## Changes
    
    | Cause | Type | Category | Description | Affected Files |
    |-------|------|----------|-------------|----------------|
    | N/A | Feat | Build | Update build configuration to support Spark 4.1 UT | 
`.github/workflows/velox_backend_x86.yml`, `gluten-ut/pom.xml`, 
`gluten-ut/spark41/pom.xml`, `tools/gluten-it/pom.xml` |
    | [#52165](https://github.com/apache/spark/pull/52165) | Fix | Dependency | 
Update Parquet dependency version to 1.16.0 to avoid NoSuchMethodError issue | 
`gluten-ut/spark41/pom.xml` |
    | [#51477](https://github.com/apache/spark/pull/51477) | Fix | 
Compatibility | Update imports to reflect streaming runtime package refactoring 
in Apache Spark | 
`gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala`, 
`gluten-ut/spark41/.../GlutenStreamingQuerySuite.scala` |
    | [#50674](https://github.com/apache/spark/pull/50674) | Fix | 
Compatibility | Fix compatibility issue introduced by `TypedConfigBuilder` | 
`gluten-substrait/.../ExpressionConverter.scala`, 
`gluten-ut/spark41/.../GlutenCSVSuite.scala`, 
`gluten-ut/spark41/.../GlutenJsonSuite.scala` |
    | [#49766](https://github.com/apache/spark/pull/49766) | Fix | 
Compatibility | Disable V2 bucketing in GlutenDynamicPartitionPruningSuite 
since spark.sql.sources.v2.bucketing.enabled is now enabled by default | 
`gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala` |
    | [#42414](https://github.com/apache/spark/pull/42414), 
[#53038](https://github.com/apache/spark/pull/53038) | Fix | Bug Fix | Resolve 
an issue introduced by SPARK-42414, as identified in SPARK-53038 | 
`backends-velox/.../VeloxBloomFilterAggregate.scala` |
    | N/A | Fix | Bug Fix | Enforce row fallback for unsupported cached batches 
- keep columnar execution only when schema validation succeeds | 
`backends-velox/.../ColumnarCachedBatchSerializer.scala` |
    | [SPARK-53132](https://github.com/apache/spark/pull/53132), 
[SPARK-53142](https://github.com/apache/spark/pull/53142) | 4.1.0 | Test 
Exclusion | Exclude additional Spark 4.1 KeyGroupedPartitioningSuite tests. 
Excluded tests: `SPARK-53322*`, `SPARK-54439*` | 
`gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [SPARK-53535](https://issues.apache.org/jira/browse/SPARK-53535), 
[SPARK-54220](https://issues.apache.org/jira/browse/SPARK-54220) | 4.1.0 | Test 
Exclusion | Exclude additional Spark 4.1 GlutenParquetIOSuite tests. Excluded 
tests: `SPARK-53535*`, `vectorized reader: missing all struct fields*`, 
`SPARK-54220*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [#52645](https://github.com/apache/spark/pull/52645) | 4.1.0 | Test 
Exclusion | Exclude additional Spark 4.1 GlutenStreamingQuerySuite tests. 
Excluded tests: `SPARK-53942: changing the number of stateless shuffle 
partitions via config`, `SPARK-53942: stateful shuffle partitions are retained 
from old checkpoint` | `gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [#47856](https://github.com/apache/spark/pull/47856) | 4.1.0 | Test 
Exclusion | Exclude additional Spark 4.1 GlutenDataFrameWindowFunctionsSuite 
and GlutenJoinSuite tests. Excluded tests: `SPARK-49386: Window spill with more 
than the inMemoryThreshold and spillSizeThreshold`, `SPARK-49386: test 
SortMergeJoin (with spill by size threshold)` | 
`gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [#52157](https://github.com/apache/spark/pull/52157) | 4.1.0 | Test 
Exclusion | Exclude additional Spark 4.1 GlutenQueryExecutionSuite tests. 
Excluded test: `#53413: Cleanup shuffle dependencies for commands` | 
`gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [#48470](https://github.com/apache/spark/pull/48470) | 4.1.0 | Test 
Exclusion | Exclude split test in GlutenRegexpExpressionsSuite. Excluded test: 
`GlutenRegexpExpressionsSuite.SPLIT` | 
`gluten-ut/spark41/.../VeloxTestSettings.scala` |
    | [#51623](https://github.com/apache/spark/pull/51623) | 4.1.0 | Test 
Exclusion | Add `spark.sql.unionOutputPartitioning=false` to Maven test args. 
Excluded tests: `GlutenBroadcastExchangeSuite.SPARK-52962`, 
`GlutenDataFrameSetOperationsSuite.SPARK-52921*` | 
`.github/workflows/velox_backend_x86.yml`, 
`gluten-ut/spark41/.../VeloxTestSettings.scala`, 
`tools/gluten-it/common/.../Suite.scala` |
    | N/A | 4.1.0 | Test Exclusion | Excludes failed SQL tests that need to be 
fixed for Spark 4.1 compatibility. Excluded tests: 
`decimalArithmeticOperations.sql`, `identifier-clause.sql`, `keywords.sql`, 
`literals.sql`, `operators.sql`, `exists-orderby-limit.sql`, 
`postgreSQL/date.sql`, `nonansi/keywords.sql`, `nonansi/literals.sql`, 
`datetime-legacy.sql`, `datetime-parsing-invalid.sql`, `misc-functions.sql` | 
`gluten-ut/spark41/.../VeloxSQLQueryTestSettings.scala` |
    | https://github.com/apache/incubator-gluten/pull/11252 | 4.1.0 | Test 
Exclusion | Exclude Gluten test for SPARK-47939: Explain should work with 
parameterized queries |  `gluten-ut/spark41/.../VeloxTestSettings.scala` |
---
 .github/workflows/velox_backend_x86.yml            |  42 ++++--
 .../aggregate/VeloxBloomFilterAggregate.scala      |  45 ++++++-
 .../execution/ColumnarCachedBatchSerializer.scala  | 146 +++++++++++----------
 .../gluten/expression/ExpressionConverter.scala    |   9 +-
 gluten-ut/pom.xml                                  |   6 +
 gluten-ut/spark41/pom.xml                          |  12 +-
 .../utils/velox/VeloxSQLQueryTestSettings.scala    |  24 ++--
 .../gluten/utils/velox/VeloxTestSettings.scala     |  26 +++-
 .../sql/GlutenDynamicPartitionPruningSuite.scala   |   6 +-
 .../sql/execution/GlutenStreamingQuerySuite.scala  |   2 +-
 .../execution/datasources/csv/GlutenCSVSuite.scala |   4 +-
 .../datasources/json/GlutenJsonSuite.scala         |   4 +-
 .../org/apache/gluten/integration/Suite.scala      |   1 +
 tools/gluten-it/pom.xml                            |  10 ++
 14 files changed, 229 insertions(+), 108 deletions(-)

diff --git a/.github/workflows/velox_backend_x86.yml 
b/.github/workflows/velox_backend_x86.yml
index 5ef70a79bc..51f71e829c 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -107,7 +107,7 @@ jobs:
       fail-fast: false
       matrix:
         os: [ "ubuntu:20.04", "ubuntu:22.04" ]
-        spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", 
"spark-4.0"  ]
+        spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", 
"spark-4.0", "spark-4.1"  ]
         java: [ "java-8", "java-11", "java-17", "java-21" ]
         # Spark supports JDK17 since 3.3.
         exclude:
@@ -141,6 +141,10 @@ jobs:
             java: java-8
           - spark: spark-4.0
             java: java-11
+          - spark: spark-4.1
+            java: java-8
+          - spark: spark-4.1
+            java: java-11
 
     runs-on: ubuntu-22.04
     container: ${{ matrix.os }}
@@ -182,11 +186,14 @@ jobs:
           cd $GITHUB_WORKSPACE/
           export JAVA_HOME=/usr/lib/jvm/${{ matrix.java }}-openjdk-amd64
           echo "JAVA_HOME: $JAVA_HOME"
-          if [ "${{ matrix.spark }}" = "spark-4.0" ]; then
-            $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} 
-Pscala-2.13 -Pbackends-velox -DskipTests
-          else
-            $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} 
-Pbackends-velox -DskipTests
-          fi
+          case "${{ matrix.spark }}" in
+            spark-4.0|spark-4.1)
+              $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java 
}} -Pscala-2.13 -Pbackends-velox -DskipTests
+              ;;
+            *)
+              $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java 
}} -Pbackends-velox -DskipTests
+              ;;
+          esac
           cd $GITHUB_WORKSPACE/tools/gluten-it
           $GITHUB_WORKSPACE/$MVN_CMD clean install -P${{ matrix.spark }} -P${{ 
matrix.java }}
           GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
@@ -200,7 +207,7 @@ jobs:
       fail-fast: false
       matrix:
         os: [ "centos:8" ]
-        spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", 
"spark-4.0" ]
+        spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", 
"spark-4.0", "spark-4.1" ]
         java: [ "java-8", "java-11", "java-17" ]
         # Spark supports JDK17 since 3.3.
         exclude:
@@ -220,6 +227,10 @@ jobs:
             java: java-8
           - spark: spark-4.0
             java: java-11
+          - spark: spark-4.1
+            java: java-8
+          - spark: spark-4.1
+            java: java-11
 
     runs-on: ubuntu-22.04
     container: ${{ matrix.os }}
@@ -263,11 +274,14 @@ jobs:
         run: |
           echo "JAVA_HOME: $JAVA_HOME"
           cd $GITHUB_WORKSPACE/
-          if [ "${{ matrix.spark }}" = "spark-4.0" ]; then
-            $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} 
-Pscala-2.13 -Pbackends-velox -DskipTests
-          else
-            $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} 
-Pbackends-velox -DskipTests
-          fi
+          case "${{ matrix.spark }}" in
+            spark-4.0|spark-4.1)
+              $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java 
}} -Pscala-2.13 -Pbackends-velox -DskipTests
+              ;;
+            *)
+              $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java 
}} -Pbackends-velox -DskipTests
+              ;;
+          esac
           cd $GITHUB_WORKSPACE/tools/gluten-it
           $GITHUB_WORKSPACE/build/mvn clean install -P${{ matrix.spark }} 
-P${{ matrix.java }}
       - name: Run TPC-H / TPC-DS
@@ -1521,7 +1535,7 @@ jobs:
           export PATH=$JAVA_HOME/bin:$PATH
           java -version
           $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox \
-          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ 
-Dspark.sql.unionOutputPartitioning=false" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
       - name: Upload test report
         if: always()
@@ -1570,7 +1584,7 @@ jobs:
           export PATH=$JAVA_HOME/bin:$PATH
           java -version
           $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox -Pspark-ut \
-          -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+          -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ 
-Dspark.sql.unionOutputPartitioning=false" \
           -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
       - name: Upload test report
         if: always()
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala
index 976abb9e21..a3d6f738a2 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala
@@ -25,10 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
 import org.apache.spark.sql.catalyst.trees.TernaryLike
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.task.TaskResources
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.sketch.BloomFilter
 
+import java.io.Serializable
+
 /**
  * Velox's bloom-filter implementation uses different algorithms internally 
comparing to vanilla
  * Spark so produces different intermediate aggregate data. Thus we use 
different filter function /
@@ -61,6 +65,15 @@ case class VeloxBloomFilterAggregate(
         .toLong
     )
 
+  // Mark as lazy so that `updater` is not evaluated during tree 
transformation.
+  private lazy val updater: BloomFilterUpdater = child.dataType match {
+    case LongType => LongUpdater
+    case IntegerType => IntUpdater
+    case ShortType => ShortUpdater
+    case ByteType => ByteUpdater
+    case _: StringType => BinaryUpdater
+  }
+
   override def first: Expression = child
 
   override def second: Expression = estimatedNumItemsExpression
@@ -97,7 +110,7 @@ case class VeloxBloomFilterAggregate(
     if (value == null) {
       return buffer
     }
-    buffer.putLong(value.asInstanceOf[Long])
+    updater.update(buffer, value)
     buffer
   }
 
@@ -128,3 +141,33 @@ case class VeloxBloomFilterAggregate(
     copy(inputAggBufferOffset = newOffset)
 
 }
+
+// see https://github.com/apache/spark/pull/42414
+private trait BloomFilterUpdater {
+  def update(bf: BloomFilter, v: Any): Boolean
+}
+
+private object LongUpdater extends BloomFilterUpdater with Serializable {
+  override def update(bf: BloomFilter, v: Any): Boolean =
+    bf.putLong(v.asInstanceOf[Long])
+}
+
+private object IntUpdater extends BloomFilterUpdater with Serializable {
+  override def update(bf: BloomFilter, v: Any): Boolean =
+    bf.putLong(v.asInstanceOf[Int])
+}
+
+private object ShortUpdater extends BloomFilterUpdater with Serializable {
+  override def update(bf: BloomFilter, v: Any): Boolean =
+    bf.putLong(v.asInstanceOf[Short])
+}
+
+private object ByteUpdater extends BloomFilterUpdater with Serializable {
+  override def update(bf: BloomFilter, v: Any): Boolean =
+    bf.putLong(v.asInstanceOf[Byte])
+}
+
+private object BinaryUpdater extends BloomFilterUpdater with Serializable {
+  override def update(bf: BloomFilter, v: Any): Boolean =
+    bf.putBinary(v.asInstanceOf[UTF8String].getBytes)
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index a04e7d68fb..c05eb4a2fa 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -115,24 +115,24 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
       conf: SQLConf): RDD[CachedBatch] = {
     val localSchema = toStructType(schema)
     if (!validateSchema(localSchema)) {
-      // we can not use columnar cache here, as the `RowToColumnar` does not 
support this schema
-      return rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch(
+      // we cannot use columnar cache here, as the `RowToColumnar` does not 
support this schema
+      rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch(
         input,
         schema,
         storageLevel,
         conf)
+    } else {
+      val numRows = conf.columnBatchSize
+      val rddColumnarBatch = input.mapPartitions {
+        it =>
+          RowToVeloxColumnarExec.toColumnarBatchIterator(
+            it,
+            localSchema,
+            numRows,
+            VeloxConfig.get.veloxPreferredBatchBytes)
+      }
+      convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, 
storageLevel, conf)
     }
-
-    val numRows = conf.columnBatchSize
-    val rddColumnarBatch = input.mapPartitions {
-      it =>
-        RowToVeloxColumnarExec.toColumnarBatchIterator(
-          it,
-          localSchema,
-          numRows,
-          VeloxConfig.get.veloxPreferredBatchBytes)
-    }
-    convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, 
conf)
   }
 
   override def convertCachedBatchToInternalRow(
@@ -141,18 +141,18 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
       selectedAttributes: Seq[Attribute],
       conf: SQLConf): RDD[InternalRow] = {
     if (!validateSchema(cacheAttributes)) {
-      // if we do not support this schema that means we are using row-based 
serializer,
+      // if we do not support this schema, that means we are using row-based 
serializer,
       // see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark 
serializer
-      return rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow(
+      rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow(
         input,
         cacheAttributes,
         selectedAttributes,
         conf)
+    } else {
+      val rddColumnarBatch =
+        convertCachedBatchToColumnarBatch(input, cacheAttributes, 
selectedAttributes, conf)
+      rddColumnarBatch.mapPartitions(it => 
VeloxColumnarToRowExec.toRowIterator(it))
     }
-
-    val rddColumnarBatch =
-      convertCachedBatchToColumnarBatch(input, cacheAttributes, 
selectedAttributes, conf)
-    rddColumnarBatch.mapPartitions(it => 
VeloxColumnarToRowExec.toRowIterator(it))
   }
 
   override def convertColumnarBatchToCachedBatch(
@@ -190,58 +190,68 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
       cacheAttributes: Seq[Attribute],
       selectedAttributes: Seq[Attribute],
       conf: SQLConf): RDD[ColumnarBatch] = {
-    // Find the ordinals and data types of the requested columns.
-    val requestedColumnIndices = selectedAttributes.map {
-      a => cacheAttributes.map(_.exprId).indexOf(a.exprId)
-    }
-    val shouldSelectAttributes = cacheAttributes != selectedAttributes
-    val localSchema = toStructType(cacheAttributes)
-    val timezoneId = SQLConf.get.sessionLocalTimeZone
-    input.mapPartitions {
-      it =>
-        val runtime = Runtimes.contextInstance(
-          BackendsApiManager.getBackendName,
-          "ColumnarCachedBatchSerializer#read")
-        val jniWrapper = ColumnarBatchSerializerJniWrapper
-          .create(runtime)
-        val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId)
-        val arrowAlloc = ArrowBufferAllocators.contextInstance()
-        val cSchema = ArrowSchema.allocateNew(arrowAlloc)
-        ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema)
-        val deserializerHandle = jniWrapper
-          .init(cSchema.memoryAddress())
-        cSchema.close()
-
-        Iterators
-          .wrap(new Iterator[ColumnarBatch] {
-            override def hasNext: Boolean = it.hasNext
-
-            override def next(): ColumnarBatch = {
-              val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch]
-              val batchHandle =
-                jniWrapper
-                  .deserialize(deserializerHandle, cachedBatch.bytes)
-              val batch = ColumnarBatches.create(batchHandle)
-              if (shouldSelectAttributes) {
-                try {
-                  ColumnarBatches.select(
-                    BackendsApiManager.getBackendName,
-                    batch,
-                    requestedColumnIndices.toArray)
-                } finally {
-                  batch.close()
+    if (!validateSchema(cacheAttributes)) {
+      // if we do not support this schema, that means we are using row-based 
serializer,
+      // see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark 
serializer
+      rowBasedCachedBatchSerializer.convertCachedBatchToColumnarBatch(
+        input,
+        cacheAttributes,
+        selectedAttributes,
+        conf)
+    } else {
+      // Find the ordinals and data types of the requested columns.
+      val requestedColumnIndices = selectedAttributes.map {
+        a => cacheAttributes.map(_.exprId).indexOf(a.exprId)
+      }
+      val shouldSelectAttributes = cacheAttributes != selectedAttributes
+      val localSchema = toStructType(cacheAttributes)
+      val timezoneId = SQLConf.get.sessionLocalTimeZone
+      input.mapPartitions {
+        it =>
+          val runtime = Runtimes.contextInstance(
+            BackendsApiManager.getBackendName,
+            "ColumnarCachedBatchSerializer#read")
+          val jniWrapper = ColumnarBatchSerializerJniWrapper
+            .create(runtime)
+          val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId)
+          val arrowAlloc = ArrowBufferAllocators.contextInstance()
+          val cSchema = ArrowSchema.allocateNew(arrowAlloc)
+          ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema)
+          val deserializerHandle = jniWrapper
+            .init(cSchema.memoryAddress())
+          cSchema.close()
+
+          Iterators
+            .wrap(new Iterator[ColumnarBatch] {
+              override def hasNext: Boolean = it.hasNext
+
+              override def next(): ColumnarBatch = {
+                val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch]
+                val batchHandle =
+                  jniWrapper
+                    .deserialize(deserializerHandle, cachedBatch.bytes)
+                val batch = ColumnarBatches.create(batchHandle)
+                if (shouldSelectAttributes) {
+                  try {
+                    ColumnarBatches.select(
+                      BackendsApiManager.getBackendName,
+                      batch,
+                      requestedColumnIndices.toArray)
+                  } finally {
+                    batch.close()
+                  }
+                } else {
+                  batch
                 }
-              } else {
-                batch
               }
+            })
+            .protectInvocationFlow()
+            .recycleIterator {
+              jniWrapper.close(deserializerHandle)
             }
-          })
-          .protectInvocationFlow()
-          .recycleIterator {
-            jniWrapper.close(deserializerHandle)
-          }
-          .recyclePayload(_.close())
-          .create()
+            .recyclePayload(_.close())
+            .create()
+      }
     }
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 418de8578f..a810a4ef1d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -831,7 +831,14 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
       case t: TransformKeys =>
         // default is `EXCEPTION`
         val mapKeyDedupPolicy = 
SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)
-        if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
+
+        // Calling `.toString` on both sides ensures compatibility across all 
Spark versions.
+        // Starting from Spark 4.1, 
`SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)` returns
+        // an enum instead of a String. Without `.toString`, the comparison
+        // `mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString` 
would silently fail
+        // in tests, producing only a "Comparing unrelated types" warning in 
IntelliJ IDEA,
+        // but no compile-time error.
+        if (mapKeyDedupPolicy.toString == 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
           // TODO: Remove after fix ready for
           //  https://github.com/facebookincubator/velox/issues/10219
           throw new GlutenNotSupportException(
diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml
index ec0158c41e..58b4e6d658 100644
--- a/gluten-ut/pom.xml
+++ b/gluten-ut/pom.xml
@@ -230,5 +230,11 @@
         <module>spark40</module>
       </modules>
     </profile>
+    <profile>
+      <id>spark-4.1</id>
+      <modules>
+        <module>spark41</module>
+      </modules>
+    </profile>
   </profiles>
 </project>
diff --git a/gluten-ut/spark41/pom.xml b/gluten-ut/spark41/pom.xml
index d169abb888..838649a21d 100644
--- a/gluten-ut/spark41/pom.xml
+++ b/gluten-ut/spark41/pom.xml
@@ -8,9 +8,13 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>gluten-ut-spark40</artifactId>
+  <artifactId>gluten-ut-spark41</artifactId>
   <packaging>jar</packaging>
-  <name>Gluten Unit Test Spark40</name>
+  <name>Gluten Unit Test Spark41</name>
+
+  <properties>
+    <parquet.version>1.16.0</parquet.version>
+  </properties>
 
   <dependencies>
     <dependency>
@@ -23,14 +27,14 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.15.2</version>
+      <version>${parquet.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
-      <version>1.15.2</version>
+      <version>${parquet.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala
index b085257f70..92c62a4ed3 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala
@@ -56,7 +56,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings 
{
     "current_database_catalog.sql",
     // "datetime-formatting-invalid.sql",
     "datetime-special.sql",
-    "decimalArithmeticOperations.sql",
+    // TODO: fix on Spark-4.1 "decimalArithmeticOperations.sql",
     "describe.sql",
     "describe-part-after-analyze.sql",
     "describe-table-after-alter-table.sql",
@@ -75,7 +75,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings 
{
     "grouping_set.sql",
     "having.sql",
     "higher-order-functions.sql",
-    "identifier-clause.sql",
+    // TODO: fix on Spark-4.1 "identifier-clause.sql",
     "ignored.sql",
     "ilike.sql",
     "ilike-all.sql",
@@ -86,11 +86,11 @@ object VeloxSQLQueryTestSettings extends 
SQLQueryTestSettings {
     "join-empty-relation.sql",
     "join-lateral.sql",
     "json-functions.sql",
-    "keywords.sql",
+    // TODO: fix on Spark-4.1 "keywords.sql",
     "like-all.sql",
     "like-any.sql",
     // "limit.sql",
-    "literals.sql",
+    // TODO: fix on Spark-4.1 "literals.sql",
     "map.sql",
     "mask-functions.sql",
     "math.sql",
@@ -99,7 +99,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings 
{
     "non-excludable-rule.sql",
     "null-handling.sql",
     "null-propagation.sql",
-    "operators.sql",
+    // TODO: fix on Spark-4.1 "operators.sql",
     "order-by-all.sql",
     // "order-by-nulls-ordering.sql",
     "order-by-ordinal.sql",
@@ -127,7 +127,7 @@ object VeloxSQLQueryTestSettings extends 
SQLQueryTestSettings {
     "subquery/exists-subquery/exists-cte.sql",
     "subquery/exists-subquery/exists-having.sql",
     "subquery/exists-subquery/exists-joins-and-set-ops.sql",
-    "subquery/exists-subquery/exists-orderby-limit.sql",
+    // TODO: fix on Spark-4.1 
"subquery/exists-subquery/exists-orderby-limit.sql",
     "subquery/exists-subquery/exists-outside-filter.sql",
     "subquery/exists-subquery/exists-within-and-or.sql",
     "subquery/in-subquery/in-basic.sql",
@@ -163,7 +163,7 @@ object VeloxSQLQueryTestSettings extends 
SQLQueryTestSettings {
     "postgreSQL/case.sql",
     "postgreSQL/comments.sql",
     "postgreSQL/create_view.sql",
-    "postgreSQL/date.sql",
+    // TODO: fix on Spark-4.1 "postgreSQL/date.sql",
     "postgreSQL/float4.sql",
     "postgreSQL/float8.sql",
     "postgreSQL/groupingsets.sql",
@@ -237,8 +237,8 @@ object VeloxSQLQueryTestSettings extends 
SQLQueryTestSettings {
     "nonansi/double-quoted-identifiers.sql",
     "nonansi/higher-order-functions.sql",
     // "nonansi/interval.sql",
-    "nonansi/keywords.sql",
-    "nonansi/literals.sql",
+    // TODO: fix on Spark-4.1 "nonansi/keywords.sql",
+    // TODO: fix on Spark-4.1 "nonansi/literals.sql",
     "nonansi/map.sql",
     "nonansi/math.sql",
     "nonansi/parse-schema-string.sql",
@@ -273,20 +273,20 @@ object VeloxSQLQueryTestSettings extends 
SQLQueryTestSettings {
     // Enable ConstantFolding rule for "typeof(...)".
     "cte.sql",
     // Removed some result mismatch cases.
-    "datetime-legacy.sql",
+    // TODO: fix on Spark-4.1 "datetime-legacy.sql",
     // Removed some result mismatch cases.
     "datetime-parsing.sql",
     // Removed some result mismatch cases.
     "datetime-parsing-legacy.sql",
     // Removed some result mismatch cases.
-    "datetime-parsing-invalid.sql",
+    // TODO: fix on Spark-4.1 "datetime-parsing-invalid.sql",
     // Overwrite exception message. See Spark-46550.
     "hll.sql",
     // Overwrite exception message.
     // TODO: Disable due to schema & ANSI gap
     // "interval.sql",
     // Enable ConstantFolding rule for "typeof(...)".
-    "misc-functions.sql",
+    // TODO: fix on Spark-4.1 "misc-functions.sql",
     // Removed some result mismatch cases.
     "regexp-functions.sql",
     // Removed some result mismatch cases.
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 24b84d58ae..540479a11f 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -75,6 +75,9 @@ class VeloxTestSettings extends BackendTestSettings {
     .excludeByPrefix("SPARK-41413: partitioned join:")
     .excludeByPrefix("SPARK-42038: partially clustered:")
     .exclude("SPARK-44641: duplicated records when SPJ is not triggered")
+    // TODO: fix on Spark-4.1
+    .excludeByPrefix("SPARK-53322") // see 
https://github.com/apache/spark/pull/53132
+    .excludeByPrefix("SPARK-54439") // see 
https://github.com/apache/spark/pull/53142
   enableSuite[GlutenLocalScanSuite]
   enableSuite[GlutenMetadataColumnSuite]
   enableSuite[GlutenSupportsCatalogOptionsSuite]
@@ -196,6 +199,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("random")
     .exclude("SPARK-9127 codegen with long seed")
   enableSuite[GlutenRegexpExpressionsSuite]
+    // TODO: fix on Spark-4.1 introduced by 
https://github.com/apache/spark/pull/48470
+    .exclude("SPLIT")
   enableSuite[GlutenSortShuffleSuite]
   enableSuite[GlutenSortOrderExpressionsSuite]
   enableSuite[GlutenStringExpressionsSuite]
@@ -471,6 +476,10 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings")
     // TODO: fix in Spark-4.0
     .exclude("explode nested lists crossing a rowgroup boundary")
+    // TODO: fix on Spark-4.1
+    .excludeByPrefix("SPARK-53535") // see 
https://issues.apache.org/jira/browse/SPARK-53535
+    .excludeByPrefix("vectorized reader: missing all struct fields")
+    .excludeByPrefix("SPARK-54220") // 
https://issues.apache.org/jira/browse/SPARK-54220
   enableSuite[GlutenParquetV1PartitionDiscoverySuite]
   enableSuite[GlutenParquetV2PartitionDiscoverySuite]
   enableSuite[GlutenParquetProtobufCompatibilitySuite]
@@ -582,6 +591,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenOuterJoinSuiteForceShjOff]
   enableSuite[GlutenFallbackStrategiesSuite]
   enableSuite[GlutenBroadcastExchangeSuite]
+    // TODO: fix on Spark-4.1 introduced by see 
https://github.com/apache/spark/pull/51623
+    .exclude("SPARK-52962: broadcast exchange should not reset metrics")
   enableSuite[GlutenLocalBroadcastExchangeSuite]
   enableSuite[GlutenCoalesceShufflePartitionsSuite]
     // Rewrite for Gluten. Change details are in the inline comments in 
individual tests.
@@ -722,6 +733,8 @@ class VeloxTestSettings extends BackendTestSettings {
     // Result depends on the implementation for nondeterministic expression 
rand.
     // Not really an issue.
     .exclude("SPARK-10740: handle nondeterministic expressions correctly for 
set operations")
+    // TODO: fix on Spark-4.1
+    .excludeByPrefix("SPARK-52921") // see 
https://github.com/apache/spark/pull/51623
   enableSuite[GlutenDataFrameStatSuite]
   enableSuite[GlutenDataFrameSuite]
     // Rewrite these tests because it checks Spark's physical operators.
@@ -755,6 +768,9 @@ class VeloxTestSettings extends BackendTestSettings {
     // rewrite `WindowExec -> WindowExecTransformer`
     .exclude(
       "SPARK-38237: require all cluster keys for child required distribution 
for window query")
+    // TODO: fix on Spark-4.1 introduced by 
https://github.com/apache/spark/pull/47856
+    .exclude(
+      "SPARK-49386: Window spill with more than the inMemoryThreshold and 
spillSizeThreshold")
   enableSuite[GlutenDataFrameWindowFramesSuite]
   enableSuite[GlutenDataFrameWriterV2Suite]
   enableSuite[GlutenDatasetAggregatorSuite]
@@ -823,6 +839,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenJoinSuite]
     // exclude as it check spark plan
     .exclude("SPARK-36794: Ignore duplicated key when building relation for 
semi/anti hash join")
+    // TODO: fix on Spark-4.1 introduced by 
https://github.com/apache/spark/pull/47856
+    .exclude("SPARK-49386: test SortMergeJoin (with spill by size threshold)")
   enableSuite[GlutenMathFunctionsSuite]
   enableSuite[GlutenMetadataCacheSuite]
     .exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException")
@@ -860,6 +878,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-38173: Quoted column cannot be recognized correctly when 
quotedRegexColumnNames is true")
     // Rewrite with Gluten's explained result.
     .exclude("SPARK-47939: Explain should work with parameterized queries")
+    // TODO: fix on Spark-4.1 based on 
https://github.com/apache/incubator-gluten/pull/11252
+    .excludeGlutenTest("SPARK-47939: Explain should work with parameterized 
queries")
   enableSuite[GlutenSQLQueryTestSuite]
   enableSuite[GlutenStatisticsCollectionSuite]
     // The output byte size of Velox is different
@@ -942,6 +962,9 @@ class VeloxTestSettings extends BackendTestSettings {
     .excludeByPrefix("SPARK-51187")
     // Rewrite for the query plan check
     .excludeByPrefix("SPARK-49905")
+    // TODO: fix on Spark-4.1 introduced by 
https://github.com/apache/spark/pull/52645
+    .exclude("SPARK-53942: changing the number of stateless shuffle partitions 
via config")
+    .exclude("SPARK-53942: stateful shuffle partitions are retained from old 
checkpoint")
   enableSuite[GlutenQueryExecutionSuite]
     // Rewritten to set root logger level to INFO so that logs can be parsed
     .exclude("Logging plan changes for execution")
@@ -949,7 +972,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("dumping query execution info to a file - explainMode=formatted")
     // TODO: fix in Spark-4.0
     .exclude("SPARK-47289: extended explain info")
-
+    // TODO: fix on Spark-4.1 introduced by 
https://github.com/apache/spark/pull/52157
+    .exclude("SPARK-53413: Cleanup shuffle dependencies for commands")
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
 // scalastyle:on line.size.limit
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
index dc96c09bc2..d2222aa8a6 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, 
StreamingQueryWrapper}
 import org.apache.spark.sql.internal.SQLConf
 
 abstract class GlutenDynamicPartitionPruningSuiteBase
@@ -155,7 +155,9 @@ abstract class GlutenDynamicPartitionPruningSuiteBase
     "SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
       "canonicalization and exchange reuse") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
         val df = sql(""" WITH view1 as (
                        |   SELECT f.store_id FROM fact_stats f WHERE 
f.units_sold = 70
                        | )
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
index bda9c97eb5..d59c046937 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.GlutenSQLTestsTrait
 import org.apache.spark.sql.execution.exchange.REQUIRED_BY_STATEFUL_OPERATOR
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.streaming._
 
 class GlutenStreamingQuerySuite extends StreamingQuerySuite with 
GlutenSQLTestsTrait {
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index 6cfa9f2028..63f0327e07 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.exception.GlutenException
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types.{DateType, IntegerType, StructType, 
TimestampType}
 
 import org.scalatest.exceptions.TestFailedException
@@ -131,5 +131,5 @@ class GlutenCSVv2Suite extends GlutenCSVSuite {
 class GlutenCSVLegacyTimeParserSuite extends GlutenCSVSuite {
   override def sparkConf: SparkConf =
     super.sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, LegacyBehaviorPolicy.LEGACY)
 }
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala
index 45e8e3e9e7..3fefc9bdd4 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{sources, GlutenSQLTestsBaseTrait, Row}
 import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, 
NoopCache}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types.{DateType, IntegerType, StructType, 
TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -136,5 +136,5 @@ class GlutenJsonLegacyTimeParserSuite extends 
GlutenJsonSuite with GlutenSQLTest
 
   override def sparkConf: SparkConf =
     super.sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, LegacyBehaviorPolicy.LEGACY)
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 2ea814df27..53280a72cb 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -69,6 +69,7 @@ abstract class Suite(
   sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800")
   sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false")
   sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak", 
s"$errorOnMemLeak")
+  sessionSwitcher.addDefaultConf("spark.sql.unionOutputPartitioning", "false")
 
   if (dataSource() == "delta") {
     sessionSwitcher.addDefaultConf(
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 27be4a1b5b..77e4eab9f6 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -327,6 +327,16 @@
         <delta.version>4.0.0</delta.version>
       </properties>
     </profile>
+    <profile>
+      <id>spark-4.1</id>
+      <properties>
+        <spark.version>4.1.0</spark.version>
+        <scala.version>2.13.17</scala.version>
+        <scala.binary.version>2.13</scala.binary.version>
+        <delta.package.name>delta-spark</delta.package.name>
+        <delta.version>4.0.0</delta.version>
+      </properties>
+    </profile>
     <profile>
       <id>celeborn-0.5</id>
       <properties>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to