Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3476389963


##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base 
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations 
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  /**
+   * Set Hadoop conf keys on the underlying mutable configuration and restore 
their previous values
+   * (or unset them) after the block. `sessionState.newHadoopConf()` copies 
from
+   * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+   */
+  private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+    // scalastyle:off hadoopconfiguration
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    // scalastyle:off hadoopconfiguration

Review Comment:
   The scalastyle suppression for `hadoopconfiguration` is duplicated and never 
turned back on, which disables the check for the rest of the file. This should 
be a bounded off/on block around the single line that needs it.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base 
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations 
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  /**
+   * Set Hadoop conf keys on the underlying mutable configuration and restore 
their previous values
+   * (or unset them) after the block. `sessionState.newHadoopConf()` copies 
from
+   * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+   */
+  private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+    // scalastyle:off hadoopconfiguration
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    // scalastyle:off hadoopconfiguration
+    val prev: Seq[(String, Option[String])] = pairs.map {
+      case (k, _) => k -> Option(hadoopConf.get(k))
+    }
+    pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+    try body
+    finally prev.foreach {
+        case (k, Some(old)) => hadoopConf.set(k, old)
+        case (k, None) => hadoopConf.unset(k)
+      }
+  }
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+      "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+    ) {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)

Review Comment:
   All tests call `genPartitions(..., leaves = Seq.empty)`, which only 
exercises the fallback path (`SparkContext.hadoopConfiguration`). The 
production path when `leaves.nonEmpty` (and `sessionState.newHadoopConf()` is 
used to pick up session-level `spark.conf` / `spark.hadoop.*` settings) is not 
covered, so regressions in that path (e.g., SparkSession selection) wouldn’t be 
caught by this suite.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -199,7 +228,16 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, 
iter.asJava)
     }
 
-    val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString).asJava
+    // Merge the fs.* keys captured on the driver (stored in 
GlutenPartition.fsConf)
+    // into the extraConf passed to NativePlanEvaluator / VeloxRuntime.
+    // Runtimes.contextInstance() will call 
GlutenConfig.getNativeSessionConf() which
+    // merges extraConf on top of SQLConf.get.getAllConfs.  Because the 
executor-side
+    // SQLConf never receives "fs.*" keys (Spark only propagates "spark.*" 
keys via
+    // task local properties), this is the only path these credentials can 
take to
+    // reach the native session config and ultimately the Velox ABFS connector.
+    val partitionFsConf = inputPartition.asInstanceOf[GlutenPartition].fsConf
+    val extraConf = (partitionFsConf +
+      (GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)).asJava
     val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)

Review Comment:
   `partitionFsConf` may contain sensitive credentials (e.g., 
`fs.s3a.secret.key`, ABFS OAuth secrets). Passing them via `extraConf` is risky 
because `Runtimes.contextInstance` includes `extraConf.toString` in the 
TaskResources registration key (`s"$backendName:$name:$extraConf"`), which can 
surface in logs/debug output and leak secrets. Consider changing the runtime 
keying/logging to avoid embedding config values (e.g., use a stable hash or 
exclude/redact sensitive keys) before shipping this broadly.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     // Only serialize plan once, save lots time when plan is complex.
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
 
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+    // Hadoop configuration NOW, while we are still on the driver, and embed
+    // them in every GlutenPartition.  These keys are set by the user via
+    //   spark.conf.set("fs.azure.account.auth.type", ...)   or
+    //   sparkContext.hadoopConfiguration.set(...)
+    // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+    // as task-local-properties, so "fs.*" keys never reach the executor's
+    // SQLConf.  Serialising them inside the partition is the only safe way
+    // to make them available to the native runtime on the executor.
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+    // SparkPlan.sqlContext is available on the driver -- using the first leaf
+    // gives us access to sessionState.newHadoopConf() which includes all keys
+    // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+    // DataFrameReader.option().  These are NOT propagated to executors by
+    // Spark's withSQLConfPropagated (it only forwards keys starting with
+    // "spark"), so embedding them in the serialised GlutenPartition is the
+    // only reliable transport mechanism.
+    val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+    val hadoopConf = leaves.headOption
+      .map(_ => 
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+      
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)

Review Comment:
   `genPartitions` uses `SparkSession.active.sessionState.newHadoopConf()` 
whenever `leaves` is non-empty, but it ignores the actual leaf/session and 
depends on a global active session being set. This can pick up the wrong 
session’s Hadoop conf (multiple SparkSessions) or throw if no active session is 
set. Prefer deriving the SparkSession from the provided `LeafTransformSupport` 
instance.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base 
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations 
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  /**
+   * Set Hadoop conf keys on the underlying mutable configuration and restore 
their previous values
+   * (or unset them) after the block. `sessionState.newHadoopConf()` copies 
from
+   * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+   */
+  private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+    // scalastyle:off hadoopconfiguration
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    // scalastyle:off hadoopconfiguration
+    val prev: Seq[(String, Option[String])] = pairs.map {
+      case (k, _) => k -> Option(hadoopConf.get(k))
+    }
+    pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+    try body
+    finally prev.foreach {
+        case (k, Some(old)) => hadoopConf.set(k, old)
+        case (k, None) => hadoopConf.unset(k)
+      }
+  }
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+      "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+    ) {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+      
assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+      assert(fsConf("fs.azure.account.oauth.provider.type") == 
"ClientCredentials")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE",
+      "fs.s3a.secret.key" -> "wJalrXUtnFEMI"

Review Comment:
   The test data uses strings that look like real AWS access keys/secret keys 
(e.g., `AKIA...`). Even if they are placeholders, they can trigger 
secret-scanning or create confusion. Use clearly dummy values that do not match 
common credential patterns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to