Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3474681435


##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan.
+   * genPartitions only calls wsCtx.root.toProtobuf.toByteArray, so a plan 
with no relations is
+   * sufficient for the purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }

Review Comment:
   This test mutates the shared `SparkContext.hadoopConfiguration` but always 
calls `unset` in `finally`. If the key was already set (e.g., in a developer/CI 
environment), this will permanently remove it for subsequent tests. Also, avoid 
printing full `fsConf` (values may contain credentials); print keys only.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan.
+   * genPartitions only calls wsCtx.root.toProtobuf.toByteArray, so a plan 
with no relations is
+   * sufficient for the purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }
+
+  test("genPartitions embeds fs.gs.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        fsConf.contains("fs.gs.auth.service.account.json.keyfile"),
+        s"Expected fs.gs key not found; got: $fsConf")
+      assert(fsConf("fs.gs.auth.service.account.json.keyfile") == 
"/tmp/sa.json")
+    } finally {
+      hadoopConf.unset("fs.gs.auth.service.account.json.keyfile")
+    }
+  }

Review Comment:
   Like the other tests, this mutates the shared Hadoop conf but always 
`unset`s the key. Restore the previous value (if any) to avoid cross-test 
pollution, and avoid printing full `fsConf` (values may contain credentials).



##########
gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala:
##########
@@ -37,8 +37,13 @@ case class GlutenPartition(
     index: Int,
     plan: Array[Byte],
     splitInfos: Array[SplitInfo] = Array.empty[SplitInfo],
-    files: Array[String] =
-      Array.empty[String] // touched files, for implementing UDF 
input_file_name
+    files: Array[String] = Array.empty[String], // touched files, for UDF 
input_file_name
+    // fs.azure.* / fs.s3a.* / fs.gs.* keys captured on the driver from
+    // sessionState.newHadoopConf() and serialised here so they survive the
+    // RDD partition boundary.  Spark's withSQLConfPropagated only propagates
+    // keys that start with "spark", so these keys are otherwise invisible on
+    // the executor side (the executor's SQLConf never sees them).
+    fsConf: Map[String, String] = Map.empty

Review Comment:
   `GlutenPartition` is a case class, so its default `toString` will include 
`fsConf` values (which can contain secrets like `fs.s3a.secret.key` / OAuth 
client secrets). This can leak credentials into Spark logs and exception 
messages (e.g., when `$split` is interpolated). Consider overriding `toString` 
to only print fsConf *keys* (or a redacted summary) rather than values.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan.
+   * genPartitions only calls wsCtx.root.toProtobuf.toByteArray, so a plan 
with no relations is
+   * sufficient for the purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }

Review Comment:
   This test uses AWS-looking access/secret key strings and unconditionally 
`unset`s the keys in `finally`. Use neutral dummy values to avoid 
secret-scanning false positives, and restore any pre-existing Hadoop conf 
values instead of always unsetting.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan.
+   * genPartitions only calls wsCtx.root.toProtobuf.toByteArray, so a plan 
with no relations is
+   * sufficient for the purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }
+
+  test("genPartitions embeds fs.gs.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        fsConf.contains("fs.gs.auth.service.account.json.keyfile"),
+        s"Expected fs.gs key not found; got: $fsConf")
+      assert(fsConf("fs.gs.auth.service.account.json.keyfile") == 
"/tmp/sa.json")
+    } finally {
+      hadoopConf.unset("fs.gs.auth.service.account.json.keyfile")
+    }
+  }
+
+  test("genPartitions does not include non-fs.* keys in 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    hadoopConf.set("fs.s3a.access.key", "KEY")
+    hadoopConf.set("spark.some.conf", "value")
+    hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize", 
"128000000")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(!fsConf.contains("spark.some.conf"), "Non-fs key must not appear 
in fsConf")
+      assert(
+        !fsConf.contains("mapreduce.input.fileinputformat.split.maxsize"),
+        "Non-fs key must not appear in fsConf")
+      assert(fsConf.contains("fs.s3a.access.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("spark.some.conf")
+      hadoopConf.unset("mapreduce.input.fileinputformat.split.maxsize")
+    }
+  }

Review Comment:
   This test also unconditionally `unset`s keys it sets on the shared Hadoop 
configuration. If any of these were already set, the original value is lost for 
subsequent tests. Please capture previous values and restore them in `finally` 
(set back if present, otherwise unset).



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     // Only serialize plan once, save lots time when plan is complex.
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
 
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+    // Hadoop configuration NOW, while we are still on the driver, and embed
+    // them in every GlutenPartition.  These keys are set by the user via
+    //   spark.conf.set("fs.azure.account.auth.type", ...)   or
+    //   sparkContext.hadoopConfiguration.set(...)
+    // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+    // as task-local-properties, so "fs.*" keys never reach the executor's
+    // SQLConf.  Serialising them inside the partition is the only safe way
+    // to make them available to the native runtime on the executor.
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+    // SparkPlan.sqlContext is available on the driver -- using the first leaf
+    // gives us access to sessionState.newHadoopConf() which includes all keys
+    // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+    // DataFrameReader.option().  These are NOT propagated to executors by
+    // Spark's withSQLConfPropagated (it only forwards keys starting with
+    // "spark"), so embedding them in the serialised GlutenPartition is the
+    // only reliable transport mechanism.
+    val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+    val hadoopConf = leaves.headOption
+      .map(_ => 
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+      
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)

Review Comment:
   `genPartitions` uses `SparkSession.active.sessionState.newHadoopConf()`, 
which can be `null`/incorrect when there is no active session on the current 
thread or when multiple sessions exist. Since you already have a 
`LeafTransformSupport`, prefer using the leaf’s `sqlContext.sparkSession` to 
obtain the correct session-scoped Hadoop conf.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to