Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3490729997
##########
gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala:
##########
@@ -18,10 +18,48 @@ package org.apache.gluten.runtime
import org.apache.spark.task.TaskResources
+import java.security.MessageDigest
import java.util
object Runtimes {
+ /**
+ * Produce a stable, value-free cache key for a (backendName, name,
extraConf) triple.
+ *
+ * Two problems with the old `s"$backendName:$name:$extraConf"` key:
+ *
+ * 1. **Credential leakage** – `Map.toString` embeds secret values (e.g.
`fs.s3a.secret.key`,
+ * `fs.azure.account.oauth2.client.secret`) in a plain heap string that
can appear in logs,
+ * thread dumps, and heap snapshots.
+ * 2. **Nondeterminism** – Scala `Map.toString` does not guarantee
insertion order, so two maps
+ * with identical entries can produce different strings, causing
spurious duplicate
+ * `VeloxRuntime` registrations within a task.
+ *
+ * Fix: sort keys, hash them with SHA-256, and use only the hex digest as
the key. Values are
+ * intentionally excluded from the digest – distinct configs (different
credentials for the same
+ * key set) that need separate runtimes are already separated at the task
level through
+ * `GlutenPartition.fsConf`. Within a single task the key set is stable, so
the digest is stable.
+ */
+ private def stableKey(
+ backendName: String,
+ name: String,
+ extraConf: util.Map[String, String]): String = {
+ val digest = MessageDigest.getInstance("SHA-256")
+ digest.update(backendName.getBytes("UTF-8"))
+ digest.update(0.toByte) // field separator
+ digest.update(name.getBytes("UTF-8"))
+ digest.update(0.toByte)
+ // Sort keys for determinism; hash only keys, not values, to avoid leaking
secrets.
+ val sortedKeys = new java.util.ArrayList(extraConf.keySet)
+ java.util.Collections.sort(sortedKeys)
+ sortedKeys.forEach {
+ k =>
+ digest.update(k.getBytes("UTF-8"))
+ digest.update(0.toByte)
+ }
+ digest.digest().map("%02x".format(_)).mkString
+ }
Review Comment:
stableKey() is currently incorrect:
`digest.digest().map("%02x".format(_)).mkString` can emit variable-length hex
for negative bytes (e.g., `ffffffff...`) and therefore produces
unstable/non-64-char digests. Also hashing only keys can cause collisions when
the same (backendName, name) is registered with different extraConf values
within the same task (e.g., IcebergWrite#write). Hashing key+value pairs avoids
leaking secrets via Map.toString without breaking cache key uniqueness.
##########
gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala:
##########
@@ -37,8 +37,13 @@ case class GlutenPartition(
index: Int,
plan: Array[Byte],
splitInfos: Array[SplitInfo] = Array.empty[SplitInfo],
- files: Array[String] =
- Array.empty[String] // touched files, for implementing UDF
input_file_name
+ files: Array[String] = Array.empty[String], // touched files, for UDF
input_file_name
+ // fs.azure.* / fs.s3a.* / fs.gs.* keys captured on the driver from
+ // sessionState.newHadoopConf() and serialised here so they survive the
+ // RDD partition boundary. Spark's withSQLConfPropagated only propagates
+ // keys that start with "spark", so these keys are otherwise invisible on
+ // the executor side (the executor's SQLConf never sees them).
Review Comment:
GlutenPartition is a case class; adding `fsConf: Map[String, String]` means
the auto-generated `toString` will include *values* (potentially credentials
like `fs.s3a.secret.key`). Partition objects can end up in exception messages
or debug logs, so this can leak secrets even if the runtime cache key is
redacted. Consider overriding `toString` (or storing a redacted wrapper) so
only keys (or counts) are logged, not values.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray
+ // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+ // Hadoop configuration NOW, while we are still on the driver, and embed
+ // them in every GlutenPartition. These keys are set by the user via
+ // spark.conf.set("fs.azure.account.auth.type", ...) or
+ // sparkContext.hadoopConfiguration.set(...)
+ // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+ // as task-local-properties, so "fs.*" keys never reach the executor's
+ // SQLConf. Serialising them inside the partition is the only safe way
+ // to make them available to the native runtime on the executor.
+ // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+ // SparkPlan.sqlContext is available on the driver -- using the first leaf
+ // gives us access to sessionState.newHadoopConf() which includes all keys
+ // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+ // DataFrameReader.option(). These are NOT propagated to executors by
+ // Spark's withSQLConfPropagated (it only forwards keys starting with
+ // "spark"), so embedding them in the serialised GlutenPartition is the
+ // only reliable transport mechanism.
+ val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+ val hadoopConf = leaves.headOption
+ .map(_ =>
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)
Review Comment:
Using `SparkSession.active` here can throw if there is no active session on
the current thread, and it can also pick up the wrong session if multiple
sessions exist. Since you already have `leaves`, prefer the session attached to
the plan (`leaves.head.sqlContext.sparkSession`) and only fall back to an
active/default session before touching `SparkContext.getOrCreate()`.
##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition,
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*,
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+ private val api = new VeloxIteratorApi
+
+ /**
+ * Build a minimal WholeStageTransformContext backed by an empty Substrait
plan. genPartitions
+ * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations
is sufficient for the
+ * purpose of this test.
+ */
+ private def emptyWsCtx: WholeStageTransformContext =
+ WholeStageTransformContext(PlanBuilder.empty())
+
+ /**
+ * Set Hadoop conf keys on the underlying mutable configuration and restore
their previous values
+ * (or unset them) after the block. `sessionState.newHadoopConf()` copies
from
+ * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+ */
+ private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+ // scalastyle:off hadoopconfiguration
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ // scalastyle:on hadoopconfiguration
+ val prev: Seq[(String, Option[String])] = pairs.map {
+ case (k, _) => k -> Option(hadoopConf.get(k))
+ }
+ pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+ try body
+ finally prev.foreach {
+ case (k, Some(old)) => hadoopConf.set(k, old)
+ case (k, None) => hadoopConf.unset(k)
+ }
+ }
+
+ test("genPartitions embeds fs.azure.* keys from Hadoop conf into
GlutenPartition.fsConf") {
+ withHadoopConf(
+ "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+ "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+ ) {
+ val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+ assert(partitions.size == 1)
+ val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+ assert(
+
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+ s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+
assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") ==
"OAuth")
+ assert(
+ fsConf.contains("fs.azure.account.oauth.provider.type"),
+ s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+ assert(fsConf("fs.azure.account.oauth.provider.type") ==
"ClientCredentials")
+ }
+ }
+
+ test("genPartitions embeds fs.s3a.* keys from Hadoop conf into
GlutenPartition.fsConf") {
+ withHadoopConf(
+ "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE",
+ "fs.s3a.secret.key" -> "wJalrXUtnFEMI"
+ ) {
Review Comment:
The test uses AWS-shaped example credentials (e.g. `AKIA...`), which can
trigger secret-scanning / false-positive credential alerts in CI and downstream
forks. Prefer obviously fake strings that do not match common key patterns,
while still asserting value propagation.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray
+ // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+ // Hadoop configuration NOW, while we are still on the driver, and embed
+ // them in every GlutenPartition. These keys are set by the user via
+ // spark.conf.set("fs.azure.account.auth.type", ...) or
+ // sparkContext.hadoopConfiguration.set(...)
Review Comment:
The comment suggests `spark.conf.set("fs.azure...", ...)` populates the
Hadoop Configuration. In Spark, Hadoop configuration typically picks up
`spark.hadoop.*` keys (with the prefix stripped) and
`sparkContext.hadoopConfiguration` mutations; plain `fs.*` keys set in
SQLConf/SparkConf generally do not flow into `newHadoopConf()` automatically.
Updating the comment would avoid misleading users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]