This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3f5adada5e71 [SPARK-50558][SQL] Introduce simpleString for
ExpressionSet
3f5adada5e71 is described below
commit 3f5adada5e7118e7fbf8135d4cb5f81df66f060b
Author: Ole Sasse <[email protected]>
AuthorDate: Mon Jan 27 10:29:52 2025 +0300
[SPARK-50558][SQL] Introduce simpleString for ExpressionSet
### What changes were proposed in this pull request?
* Introduce a simpleString method equal to the one for Expression and add
it to ExpressionSet
* Use it for push down filter logging in DataSourceStrategy
* Use if for after scan filter logging in FileSourceStrategy
### Why are the changes needed?
Filter expressions can be arbitrarily large and should not be logged
completely in these cases
### Does this PR introduce _any_ user-facing change?
No, logging is not user facing
### How was this patch tested?
Added new tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49650 from olaky/spark-50558-add-simple-string-for-expression-set.
Authored-by: Ole Sasse <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 771d81ab48a6c990653028a3167a7ad0911ba573)
Signed-off-by: Max Gekk <[email protected]>
---
.../org/apache/spark/sql/catalyst/util/StringUtils.scala | 15 ++++++++++++---
.../spark/sql/catalyst/expressions/ExpressionSet.scala | 9 +++++++++
.../sql/catalyst/expressions/ExpressionSetSuite.scala | 14 ++++++++++++++
.../sql/execution/datasources/DataSourceStrategy.scala | 5 ++++-
.../sql/execution/datasources/FileSourceStrategy.scala | 6 +++++-
5 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index e8c50be9f551..486093225f06 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -83,7 +83,8 @@ object SparkStringUtils extends Logging {
start: String,
sep: String,
end: String,
- maxFields: Int): String = {
+ maxFields: Int,
+ customToString: Option[T => String] = None): String = {
if (seq.length > maxFields) {
if (truncationWarningPrinted.compareAndSet(false, true)) {
logWarning(
@@ -94,9 +95,17 @@ object SparkStringUtils extends Logging {
val restNum = seq.length - numFields
val ending = (if (numFields == 0) "" else sep) +
(if (restNum == 0) "" else s"... $restNum more fields") + end
- seq.take(numFields).mkString(start, sep, ending)
+ if (customToString.isDefined) {
+ seq.take(numFields).map(customToString.get).mkString(start, sep,
ending)
+ } else {
+ seq.take(numFields).mkString(start, sep, ending)
+ }
} else {
- seq.mkString(start, sep, end)
+ if (customToString.isDefined) {
+ seq.map(customToString.get).mkString(start, sep, end)
+ } else {
+ seq.mkString(start, sep, end)
+ }
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
index 1aa9f006463c..cc6fea2f1b7f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.sql.catalyst.util.SparkStringUtils
+
object ExpressionSet {
/**
* Constructs a new [[ExpressionSet]] by applying
[[Expression#canonicalized]] to `expressions`.
@@ -178,5 +180,12 @@ class ExpressionSet protected(
|baseSet: ${baseSet.mkString(", ")}
|originals: ${originals.mkString(", ")}
""".stripMargin
+
+ /** Returns a length limited string that must be used for logging only. */
+ def simpleString(maxFields: Int): String = {
+ val customToString = { e: Expression => e.simpleString(maxFields) }
+ SparkStringUtils.truncatedString(
+ seq = originals.toSeq, start = "Set(", sep = ", ", end = ")", maxFields,
Some(customToString))
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
index 4d5938b2e760..810c3f9ccf2e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
@@ -240,4 +240,18 @@ class ExpressionSetSuite extends SparkFunSuite {
assert((initialSet -- setToRemoveWithOutSameExpression).size == 2)
}
+ test("simpleString limits the number of expressions recursively") {
+ val expressionSet =
+ ExpressionSet(InSet(aUpper, Set(0, 1)) :: Rand(1) :: Rand(2) :: Rand(3)
:: Nil)
+ assert(expressionSet.simpleString(1) ==
+ "Set(A#1 INSET 0, ... 1 more fields, ... 3 more fields)")
+ assert(expressionSet.simpleString(2) == "Set(A#1 INSET 0, 1, rand(1), ...
2 more fields)")
+ assert(expressionSet.simpleString(3) ==
+ "Set(A#1 INSET 0, 1, rand(1), rand(2), ... 1 more fields)")
+ assert(expressionSet.simpleString(4) == expressionSet.toString)
+
+ // Only one expression, but the simple string for this expression must be
truncated.
+ val expressionSetTwo = ExpressionSet(InSet(aUpper, Set(0, 1, 2, 3, 4)) ::
Nil)
+ assert(expressionSetTwo.simpleString(1) == "Set(A#1 INSET 0, ... 4 more
fields)")
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e2e72a9e3695..a1bcf575ce58 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec,
SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
PushedDownOperators}
import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -519,9 +520,11 @@ object DataSourceStrategy
ExpressionSet(Nil)
} else {
val partitionSet = AttributeSet(partitionColumns)
+ val maxToStringFields = SQLConf.get.getConf(SQLConf.MAX_TO_STRING_FIELDS)
val predicates = ExpressionSet(normalizedFilters
.flatMap(extractPredicatesWithinOutputSet(_, partitionSet)))
- logInfo(log"Pruning directories with: ${MDC(PREDICATES,
predicates.mkString(","))}")
+ logInfo(log"Pruning directories with: ${MDC(PREDICATES,
+ predicates.simpleString(maxToStringFields))}")
predicates
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 02235ffb1976..0f22c23791a1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreePattern.{PLAN_EXPRESSION,
SCALAR_SUBQUERY}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.BitSet
@@ -199,6 +200,7 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
Some(f)
}
}
+
val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
val pushedFilters = dataFilters
@@ -207,7 +209,9 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// Predicates with both partition keys and attributes need to be
evaluated after the scan.
val afterScanFilters = filterSet --
partitionKeyFilters.filter(_.references.nonEmpty)
- logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS,
afterScanFilters.mkString(","))}")
+ val maxToStringFields =
fsRelation.sparkSession.conf.get(SQLConf.MAX_TO_STRING_FIELDS)
+ logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS,
+ afterScanFilters.simpleString(maxToStringFields))}")
val filterAttributes = AttributeSet(afterScanFilters ++ stayUpFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq
++ projects
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]