This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c8c12e7c4303 [SPARK-51527][SQL] Make codegen log level configurable via SQLConf c8c12e7c4303 is described below commit c8c12e7c43034bd09d62fb53063c7527e7f8c5fc Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu Mar 20 12:45:17 2025 +0800 [SPARK-51527][SQL] Make codegen log level configurable via SQLConf ### What changes were proposed in this pull request? Introduce a new conf `spark.sql.codegen.logLevel` to allow user to adjust the generated code log level via SQL `SET spark.sql.codegen.logLevel=INFO`, with some code refactors to reduce duplicated code. ### Why are the changes needed? Simplify the codegen debug process, previously, it was hardcoded as DEBUG level, developers must tune the log4j configuration or change the log level globally to diagnose the codegen issue. ### Does this PR introduce _any_ user-facing change? Introduce a new configuration, while the default value keeps existing behavior. ### How was this patch tested? Add new UT. Also test manually with `spark-sql` ``` $ apache-spark git:(SPARK-51527) SPARK_PREPEND_CLASSES=true bin/spark-sql NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly. WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/03/19 15:41:05 WARN Utils: Your hostname, H27212-MAC-01.local, resolves to a loopback address: 127.0.0.1; using 10.243.76.75 instead (on interface en0) 25/03/19 15:41:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/03/19 15:41:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/03/19 15:41:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0 25/03/19 15:41:09 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore chengpan127.0.0.1 25/03/19 15:41:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException Spark Web UI available at http://10.243.76.75:4040 Spark master: local[*], Application Id: local-1742370066718 spark-sql (default)> SET spark.sql.codegen.logLevel=WARN; spark.sql.codegen.logLevel WARN Time taken: 0.661 seconds, Fetched 1 row(s) spark-sql (default)> select 1+1; 25/03/19 15:41:28 WARN CodeGenerator: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator rdd_input_0; ... ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50294 from pan3793/SPARK-51527. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../scala/org/apache/spark/internal/LogKey.scala | 1 + .../scala/org/apache/spark/internal/Logging.scala | 11 +++ .../catalyst/analysis/resolver/PlanLogger.scala | 79 ++++++++-------------- .../expressions/codegen/CodeGenerator.scala | 6 +- .../spark/sql/catalyst/rules/RuleExecutor.scala | 17 +---- .../org/apache/spark/sql/internal/SQLConf.scala | 54 ++++++++++----- .../catalyst/expressions/CodeGenerationSuite.scala | 16 +++++ 7 files changed, 101 insertions(+), 83 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 318f32c52b90..9da5f4bae6d0 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -139,6 +139,7 @@ private[spark] object LogKeys { case object CLUSTER_LABEL extends LogKey case object CLUSTER_LEVEL extends LogKey case object CLUSTER_WEIGHT extends LogKey + case object CODE extends LogKey case object CODEC_LEVEL extends LogKey case object CODEC_NAME extends LogKey case object CODEGEN_STAGE_ID extends LogKey diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 1901b366074f..cc5d0281829d 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -26,6 +26,7 @@ import org.apache.logging.log4j.core.appender.ConsoleAppender import org.apache.logging.log4j.core.config.DefaultConfiguration import org.apache.logging.log4j.core.filter.AbstractFilter import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.event.{Level => Slf4jLevel} import org.apache.spark.internal.Logging.SparkShellLoggingFilter import org.apache.spark.internal.LogKeys @@ -307,6 +308,16 @@ trait Logging { log.isTraceEnabled } + protected def logBasedOnLevel(level: Slf4jLevel)(f: => MessageWithContext): Unit = { + level match { + case Slf4jLevel.TRACE => logTrace(f.message) + case Slf4jLevel.DEBUG => logDebug(f.message) + case Slf4jLevel.INFO => logInfo(f) + case Slf4jLevel.WARN => logWarning(f) + case Slf4jLevel.ERROR => logError(f) + } + } + protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = { initializeLogIfNecessary(isInterpreter, silent = false) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala index a0d67893484c..f9bdfa06ae54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver -import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -32,65 +32,44 @@ class PlanLogger extends Logging { private val expressionTreeChangeLogLevel = SQLConf.get.expressionTreeChangeLogLevel def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = { - log(() => log""" - |=== Plan resolution: ${MDC(MESSAGE, event)} === - |${MDC(QUERY_PLAN, plan.treeString)} - """.stripMargin, planChangeLogLevel) + logBasedOnLevel(planChangeLogLevel) { + log""" + |=== Plan resolution: ${MDC(MESSAGE, event)} === + |${MDC(QUERY_PLAN, plan.treeString)} + """.stripMargin + } } def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = { - log( - () => - log""" - |=== Unresolved plan -> Resolved plan === - |${MDC( - QUERY_PLAN, - sideBySide( - unresolvedPlan.treeString, - resolvedPlan.treeString - ).mkString("\n") - )} - """.stripMargin, - planChangeLogLevel - ) + logBasedOnLevel(planChangeLogLevel) { + val unresolved = unresolvedPlan.treeString + val resolved = resolvedPlan.treeString + log""" + |=== Unresolved plan -> Resolved plan === + |${MDC(QUERY_PLAN, sideBySide(unresolved, resolved).mkString("\n"))} + """.stripMargin + } } def logExpressionTreeResolutionEvent(expressionTree: Expression, event: String): Unit = { - log( - () => log""" - |=== Expression tree resolution: ${MDC(MESSAGE, event)} === - |${MDC(QUERY_PLAN, expressionTree.treeString)} - """.stripMargin, - expressionTreeChangeLogLevel - ) + logBasedOnLevel(expressionTreeChangeLogLevel) { + log""" + |=== Expression tree resolution: ${MDC(MESSAGE, event)} === + |${MDC(QUERY_PLAN, expressionTree.treeString)} + """.stripMargin + } } def logExpressionTreeResolution( unresolvedExpressionTree: Expression, resolvedExpressionTree: Expression): Unit = { - log( - () => - log""" - |=== Unresolved expression tree -> Resolved expression tree === - |${MDC( - QUERY_PLAN, - sideBySide( - unresolvedExpressionTree.treeString, - resolvedExpressionTree.treeString - ).mkString("\n") - )} - """.stripMargin, - expressionTreeChangeLogLevel - ) - } - - private def log(createMessage: () => MessageWithContext, logLevel: String): Unit = - logLevel match { - case "TRACE" => logTrace(createMessage().message) - case "DEBUG" => logDebug(createMessage().message) - case "INFO" => logInfo(createMessage()) - case "WARN" => logWarning(createMessage()) - case "ERROR" => logError(createMessage()) - case _ => logTrace(createMessage().message) + logBasedOnLevel(expressionTreeChangeLogLevel) { + val unresolved = unresolvedExpressionTree.treeString + val resolved = resolvedExpressionTree.treeString + log""" + |=== Unresolved expression tree -> Resolved expression tree === + |${MDC(QUERY_PLAN, sideBySide(unresolved, resolved).mkString("\n"))} + """.stripMargin } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index de74bb2f8cd2..2564d4eab9bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1537,11 +1537,11 @@ object CodeGenerator extends Logging { ) evaluator.setExtendedClass(classOf[GeneratedClass]) - logDebug({ + logBasedOnLevel(SQLConf.get.codegenLogLevel) { // Only add extra debugging info to byte code when we are going to print the source code. evaluator.setDebuggingInformation(true, true, false) - s"\n${CodeFormatter.format(code)}" - }) + log"\n${MDC(LogKeys.CODE, CodeFormatter.format(code))}" + } val codeStats = try { evaluator.cook("generated.java", code.body) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 76d36fab2096..b46d20eec5de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -65,7 +65,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin } - logBasedOnLevel(message()) + logBasedOnLevel(logLevel)(message()) } } } @@ -83,7 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } - logBasedOnLevel(message()) + logBasedOnLevel(logLevel)(message()) } } @@ -101,18 +101,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin // scalastyle:on line.size.limit - logBasedOnLevel(message) - } - - private def logBasedOnLevel(f: => MessageWithContext): Unit = { - logLevel match { - case "TRACE" => logTrace(f.message) - case "DEBUG" => logDebug(f.message) - case "INFO" => logInfo(f) - case "WARN" => logWarning(f) - case "ERROR" => logError(f) - case _ => logTrace(f.message) - } + logBasedOnLevel(logLevel)(message) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5ef83df3f7bb..765da9fc04f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -31,6 +31,7 @@ import scala.util.matching.Regex import org.apache.avro.file.CodecFactory import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.OutputCommitter +import org.slf4j.event.Level import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, SparkException, TaskContext} import org.apache.spark.internal.Logging @@ -362,17 +363,22 @@ object SQLConf { "for using switch statements in InSet must be non-negative and less than or equal to 600") .createWithDefault(400) + private val VALID_LOG_LEVELS: Array[String] = Level.values.map(_.toString) + + private def isValidLogLevel(level: String): Boolean = + VALID_LOG_LEVELS.contains(level.toUpperCase(Locale.ROOT)) + val PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.planChangeLog.level") .internal() .doc("Configures the log level for logging the change from the original plan to the new " + - "plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " + - "'warn', or 'error'. The default log level is 'trace'.") + s"plan after a rule or batch is applied. The value can be " + + s"${VALID_LOG_LEVELS.mkString(", ")}.") .version("3.1.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + .checkValue(isValidLogLevel, "Invalid value for 'spark.sql.planChangeLog.level'. Valid values are " + - "'trace', 'debug', 'info', 'warn' and 'error'.") + s"${VALID_LOG_LEVELS.mkString(", ")}.") .createWithDefault("trace") val PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.planChangeLog.rules") @@ -403,13 +409,13 @@ object SQLConf { .internal() .doc("Configures the log level for logging the change from the unresolved expression tree to " + "the resolved expression tree in the single-pass bottom-up Resolver. The value can be " + - "'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'trace'.") + s"${VALID_LOG_LEVELS.mkString(", ")}.") .version("4.0.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + .checkValue(isValidLogLevel, "Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " + - "'trace', 'debug', 'info', 'warn' and 'error'.") + s"${VALID_LOG_LEVELS.mkString(", ")}.") .createWithDefault("trace") val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation") @@ -780,11 +786,13 @@ object SQLConf { val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel") .internal() .doc("Configures the log level for adaptive execution logging of plan changes. The value " + - "can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'debug'.") + s"can be ${VALID_LOG_LEVELS.mkString(", ")}.") .version("3.0.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR")) + .checkValue(isValidLogLevel, + "Invalid value for 'spark.sql.adaptive.logLevel'. Valid values are " + + s"${VALID_LOG_LEVELS.mkString(", ")}.") .createWithDefault("debug") val ADVISORY_PARTITION_SIZE_IN_BYTES = @@ -1812,15 +1820,15 @@ object SQLConf { val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") .internal() .doc("Configures the log level of Dataframe cache operations, including adding and removing " + - "entries from Dataframe cache, hit and miss on cache application. The default log " + - "level is 'trace'. This log should only be used for debugging purposes and not in the " + - "production environment, since it generates a large amount of logs.") + "entries from Dataframe cache, hit and miss on cache application. This log should only be " + + "used for debugging purposes and not in the production environment, since it generates a " + + "large amount of logs.") .version("4.0.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + .checkValue(isValidLogLevel, "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " + - "'trace', 'debug', 'info', 'warn' and 'error'.") + s"${VALID_LOG_LEVELS.mkString(", ")}.") .createWithDefault("trace") val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") @@ -2020,6 +2028,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CODEGEN_LOG_LEVEL = buildConf("spark.sql.codegen.logLevel") + .internal() + .doc("Configures the log level for logging of codegen. " + + s"The value can be ${VALID_LOG_LEVELS.mkString(", ")}.") + .version("4.1.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(isValidLogLevel, + "Invalid value for 'spark.sql.codegen.logLevel'. Valid values are " + + s"${VALID_LOG_LEVELS.mkString(", ")}.") + .createWithDefault("DEBUG") + val CODEGEN_LOGGING_MAX_LINES = buildConf("spark.sql.codegen.logging.maxLines") .internal() .doc("The maximum number of codegen lines to log when errors occur. Use -1 for unlimited.") @@ -5784,13 +5804,13 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD) - def planChangeLogLevel: String = getConf(PLAN_CHANGE_LOG_LEVEL) + def planChangeLogLevel: Level = Level.valueOf(getConf(PLAN_CHANGE_LOG_LEVEL)) def planChangeRules: Option[String] = getConf(PLAN_CHANGE_LOG_RULES) def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES) - def expressionTreeChangeLogLevel: String = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL) + def expressionTreeChangeLogLevel: Level = Level.valueOf(getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)) def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED) @@ -6021,6 +6041,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS) + def codegenLogLevel: Level = Level.valueOf(getConf(CODEGEN_LOG_LEVEL)) + def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 4c045f9fda73..7ce14bcedf4b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ThreadUtils @@ -534,6 +535,21 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { .exists(_.getMessage().getFormattedMessage.contains("Generated method too long"))) } + test("SPARK-51527: spark.sql.codegen.logLevel") { + withSQLConf(SQLConf.CODEGEN_LOG_LEVEL.key -> "INFO") { + val appender = new LogAppender("codegen log level") + withLogAppender(appender, loggerNames = Seq(classOf[CodeGenerator[_, _]].getName), + Some(Level.INFO)) { + GenerateUnsafeProjection.generate(Seq(Literal.TrueLiteral)) + } + assert(appender.loggingEvents.exists { event => + event.getLevel === Level.INFO && + event.getMessage.getFormattedMessage.contains( + "public java.lang.Object generate(Object[] references)") + }) + } + } + test("SPARK-28916: subexpression elimination can cause 64kb code limit on UnsafeProjection") { val numOfExprs = 10000 val exprs = (0 to numOfExprs).flatMap(colIndex => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org