Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3481993381


##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -199,7 +228,16 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName, 
iter.asJava)
     }
 
-    val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString).asJava
+    // Merge the fs.* keys captured on the driver (stored in 
GlutenPartition.fsConf)
+    // into the extraConf passed to NativePlanEvaluator / VeloxRuntime.
+    // Runtimes.contextInstance() will call 
GlutenConfig.getNativeSessionConf() which
+    // merges extraConf on top of SQLConf.get.getAllConfs.  Because the 
executor-side
+    // SQLConf never receives "fs.*" keys (Spark only propagates "spark.*" 
keys via
+    // task local properties), this is the only path these credentials can 
take to
+    // reach the native session config and ultimately the Velox ABFS connector.
+    val partitionFsConf = inputPartition.asInstanceOf[GlutenPartition].fsConf
+    val extraConf = (partitionFsConf +
+      (GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)).asJava
     val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
 

Review Comment:
   `extraConf` is passed into `NativePlanEvaluator.create`, which calls 
`Runtimes.contextInstance`. Today `Runtimes.contextInstance` uses 
`s"$backendName:$name:$extraConf"` as the TaskResources key 
(gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala:25-35). 
With this change, `extraConf` can contain fs credential values (e.g. 
`fs.s3a.secret.key`), so those secrets may be retained in heap and may be 
exposed via debugging/logging of resource keys; additionally, map `toString` 
order can be nondeterministic, causing unnecessary distinct runtime 
registrations. Consider changing the runtime keying strategy to use a stable 
hash and/or redact sensitive values (or avoid including values entirely).



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     // Only serialize plan once, save lots time when plan is complex.
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
 
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+    // Hadoop configuration NOW, while we are still on the driver, and embed
+    // them in every GlutenPartition.  These keys are set by the user via
+    //   spark.conf.set("fs.azure.account.auth.type", ...)   or
+    //   sparkContext.hadoopConfiguration.set(...)
+    // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+    // as task-local-properties, so "fs.*" keys never reach the executor's
+    // SQLConf.  Serialising them inside the partition is the only safe way
+    // to make them available to the native runtime on the executor.
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+    // SparkPlan.sqlContext is available on the driver -- using the first leaf
+    // gives us access to sessionState.newHadoopConf() which includes all keys
+    // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+    // DataFrameReader.option().  These are NOT propagated to executors by
+    // Spark's withSQLConfPropagated (it only forwards keys starting with
+    // "spark"), so embedding them in the serialised GlutenPartition is the
+    // only reliable transport mechanism.
+    val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+    val hadoopConf = leaves.headOption
+      .map(_ => 
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+      
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)

Review Comment:
   `SparkSession.active` may throw if there is no active session bound to the 
current thread, and it can also pick up the wrong session when multiple 
SparkSessions are in play. Since `leaves` are SparkPlans created under the 
correct SQLContext, prefer using 
`leaves.head.sqlContext.sessionState.newHadoopConf()` rather than the global 
active-session lookup.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base 
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations 
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  /**
+   * Set Hadoop conf keys on the underlying mutable configuration and restore 
their previous values
+   * (or unset them) after the block. `sessionState.newHadoopConf()` copies 
from
+   * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+   */
+  private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+    // scalastyle:off hadoopconfiguration
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    // scalastyle:on hadoopconfiguration
+    val prev: Seq[(String, Option[String])] = pairs.map {
+      case (k, _) => k -> Option(hadoopConf.get(k))
+    }
+    pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+    try body
+    finally prev.foreach {
+        case (k, Some(old)) => hadoopConf.set(k, old)
+        case (k, None) => hadoopConf.unset(k)
+      }
+  }
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+      "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+    ) {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+      
assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+      assert(fsConf("fs.azure.account.oauth.provider.type") == 
"ClientCredentials")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE",
+      "fs.s3a.secret.key" -> "wJalrXUtnFEMI"
+    ) {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        fsConf.contains("fs.s3a.access.key"),
+        s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(", 
")}")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+      assert(fsConf("fs.s3a.secret.key") == "wJalrXUtnFEMI")
+    }
+  }

Review Comment:
   This test uses AWS-looking access key / secret key strings. Even though they 
are dummy values, patterns like `AKIA...` are commonly flagged by secret 
scanners and can block merges. Prefer clearly fake placeholders that do not 
match real credential formats.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base 
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations 
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  /**
+   * Set Hadoop conf keys on the underlying mutable configuration and restore 
their previous values
+   * (or unset them) after the block. `sessionState.newHadoopConf()` copies 
from
+   * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+   */
+  private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+    // scalastyle:off hadoopconfiguration
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    // scalastyle:on hadoopconfiguration
+    val prev: Seq[(String, Option[String])] = pairs.map {
+      case (k, _) => k -> Option(hadoopConf.get(k))
+    }
+    pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+    try body
+    finally prev.foreach {
+        case (k, Some(old)) => hadoopConf.set(k, old)
+        case (k, None) => hadoopConf.unset(k)
+      }
+  }
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    withHadoopConf(
+      "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+      "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+    ) {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)

Review Comment:
   Issue #10113 and the new code comments mention configs set via 
`spark.conf.set("fs.*", ...)` / SQLConf, but this suite only mutates 
`sparkContext.hadoopConfiguration`. That means the tests don’t currently 
validate the primary regression scenario (session-level `spark.conf.set` keys 
making it into `GlutenPartition.fsConf`). Consider adding at least one test 
that sets an `fs.*` key via `spark.conf.set` and asserts it is captured.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to