This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 296d0f9bed00 [SPARK-53004][CORE] Support `abbreviate` in `SparkStringUtils` 296d0f9bed00 is described below commit 296d0f9bed000eb5db56a58890c2a5872da70202 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Jul 29 20:33:40 2025 -0700 [SPARK-53004][CORE] Support `abbreviate` in `SparkStringUtils` ### What changes were proposed in this pull request? This PR aims to support `abbreviate` in `SparkStringUtils`. In addition, this PR adds a new Scalastyle rule to ban `StringUtils. abbreviate` in favor of the built-in implementation. ### Why are the changes needed? To improve Spark's string utility and reduce the 3rd party library dependency. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51711 from dongjoon-hyun/SPARK-53004. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../main/scala/org/apache/spark/util/SparkStringUtils.scala | 12 ++++++++++++ .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 3 +-- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 +-- scalastyle-config.xml | 5 +++++ .../org/apache/spark/sql/catalyst/catalog/interface.scala | 4 ++-- .../spark/sql/connect/execution/ExecuteThreadRunner.scala | 9 +++------ .../spark/sql/connect/service/SparkConnectService.scala | 3 +-- .../org/apache/spark/sql/connect/utils/ErrorUtils.scala | 6 +++--- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 3 +-- .../spark/sql/execution/columnar/InMemoryRelation.scala | 3 +-- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 3 +-- 11 files changed, 31 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala index bec633e051d0..9cf4b7ff6a91 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala @@ -38,6 +38,18 @@ private[spark] trait SparkStringUtils { SPACE_DELIMITED_UPPERCASE_HEX.parseHex(hex.stripPrefix("[").stripSuffix("]")) } + def abbreviate(str: String, abbrevMarker: String, len: Int): String = { + if (str == null || abbrevMarker == null) { + null + } else if (str.length() <= len || str.length() <= abbrevMarker.length()) { + str + } else { + str.substring(0, len - abbrevMarker.length()) + abbrevMarker + } + } + + def abbreviate(str: String, len: Int): String = abbreviate(str, "...", len) + def sideBySide(left: String, right: String): Seq[String] = { sideBySide(left.split("\n").toImmutableArraySeq, right.split("\n").toImmutableArraySeq) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 9fe0697fa0d5..cac3c13834a2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s import java.util.{Locale, UUID} import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} -import org.apache.commons.lang3.StringUtils import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} @@ -314,7 +313,7 @@ private[spark] object KubernetesConf { // must be 63 characters or less to follow the DNS label standard, so take the 63 characters // of the appName name as the label. In addition, label value must start and end with // an alphanumeric character. - StringUtils.abbreviate( + Utils.abbreviate( s"$appName" .trim .toLowerCase(Locale.ROOT) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ca524caacb7c..3f8e97052d81 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -28,7 +28,6 @@ import scala.concurrent.Promise import scala.concurrent.duration.Duration import scala.util.control.NonFatal -import org.apache.commons.lang3.{StringUtils => ComStrUtils} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -389,7 +388,7 @@ private[spark] class ApplicationMaster( logInfo(log"Final app status: ${MDC(LogKeys.APP_STATE, finalStatus)}, " + log"exitCode: ${MDC(LogKeys.EXIT_CODE, exitCode)}" + Option(msg).map(msg => log", (reason: ${MDC(LogKeys.REASON, msg)})").getOrElse(log"")) - finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) + finalMsg = Utils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { logDebug("shutting down reporter thread") diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 680d5ed4a14e..d9f25812081b 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -329,6 +329,11 @@ This file is divided into 3 sections: <customMessage>Use org.apache.spark.StringSubstitutor instead</customMessage> </check> + <check customId="commonslang3abbreviate" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">StringUtils\.abbreviate\(</parameter></parameters> + <customMessage>Use Utils.abbreviate method instead</customMessage> + </check> + <check customId="uribuilder" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">UriBuilder\.fromUri</parameter></parameters> <customMessage>Use Utils.getUriBuilder instead.</customMessage> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 46d61eec720d..bb6903dff0ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,7 +27,6 @@ import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} -import org.apache.commons.lang3.StringUtils import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JInt, JLong, JNull, JObject, JString, JValue} import org.json4s.jackson.JsonMethods._ @@ -50,6 +49,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.Utils /** * Interface providing util to convert JValue to String representation of catalog entities. @@ -1150,7 +1150,7 @@ case class HiveTableRelation( val metadataEntries = metadata.toSeq.map { case (key, value) if key == "CatalogTable" => value case (key, value) => - key + ": " + StringUtils.abbreviate(value, SQLConf.get.maxMetadataStringLength) + key + ": " + Utils.abbreviate(value, SQLConf.get.maxMetadataStringLength) } val metadataStr = truncatedString(metadataEntries, "[", ", ", "]", maxFields) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 13857e066a8f..fdb0ef363124 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -23,7 +23,6 @@ import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import com.google.protobuf.Message -import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkSQLException import org.apache.spark.connect.proto @@ -209,16 +208,14 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends tag)) } session.sparkContext.setJobDescription( - s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}") + s"Spark Connect - ${Utils.abbreviate(debugString, 128)}") session.sparkContext.setInterruptOnCancel(true) // Add debug information to the query execution so that the jobs are traceable. session.sparkContext.setLocalProperty( "callSite.short", - s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}") - session.sparkContext.setLocalProperty( - "callSite.long", - StringUtils.abbreviate(debugString, 2048)) + s"Spark Connect - ${Utils.abbreviate(debugString, 128)}") + session.sparkContext.setLocalProperty("callSite.long", Utils.abbreviate(debugString, 2048)) executeHolder.request.getPlan.getOpTypeCase match { case proto.Plan.OpTypeCase.COMMAND => handleCommand(executeHolder.request) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 262cc954f8f9..04030c144c2e 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -29,7 +29,6 @@ import io.grpc.netty.NettyServerBuilder import io.grpc.protobuf.ProtoUtils import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver -import org.apache.commons.lang3.StringUtils import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.connect.proto @@ -504,7 +503,7 @@ object SparkConnectService extends Logging { } def extractErrorMessage(st: Throwable): String = { - val message = StringUtils.abbreviate(st.getMessage, 2048) + val message = Utils.abbreviate(st.getMessage, 2048) convertNullString(message) } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 837d4a4d3ee7..292d2eee0f15 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -30,7 +30,6 @@ import com.google.rpc.{Code => RPCCode, ErrorInfo, Status => RPCStatus} import io.grpc.Status import io.grpc.protobuf.StatusProto import io.grpc.stub.StreamObserver -import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods @@ -44,6 +43,7 @@ import org.apache.spark.sql.connect.config.Connect import org.apache.spark.sql.connect.service.{ExecuteEventsManager, SessionHolder, SessionKey, SparkConnectService} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.Utils private[connect] object ErrorUtils extends Logging { @@ -225,7 +225,7 @@ private[connect] object ErrorUtils extends Logging { val maxSize = Math.min( SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE), maxMetadataSize) - errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt)) + errorInfo.putMetadata("stackTrace", Utils.abbreviate(stackTrace.get, maxSize.toInt)) } else { errorInfo } @@ -297,7 +297,7 @@ private[connect] object ErrorUtils extends Logging { e, Status.UNKNOWN .withCause(e) - .withDescription(StringUtils.abbreviate(e.getMessage, 2048)) + .withDescription(Utils.abbreviate(e.getMessage, 2048)) .asRuntimeException()) } partial diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 14c62a7992ad..9f4a1718d820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit._ -import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, OPEN_COST_IN_BYTES} @@ -65,7 +64,7 @@ trait DataSourceScanExec extends LeafExecNode with StreamSourceAwareSparkPlan { override def simpleString(maxFields: Int): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength) + key + ": " + Utils.abbreviate(redact(value), maxMetadataValueLength) } val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) redact( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 7914c9663d2d..bfcbc9a06d27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.columnar import com.esotericsoftware.kryo.{DefaultSerializer, Kryo, Serializer => KryoSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.apache.commons.lang3.StringUtils import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.network.util.JavaUtils @@ -267,7 +266,7 @@ case class CachedRDDBuilder( private val materializedPartitions = cachedPlan.session.sparkContext.longAccumulator val cachedName = tableName.map(n => s"In-memory table $n") - .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)) + .getOrElse(Utils.abbreviate(cachedPlan.toString, 1024)) val supportsColumnarInput: Boolean = { cachedPlan.supportsColumnar && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 32fe21701af6..3663d6a98b86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.{Locale, OptionalLong} -import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.internal.{Logging, MDC} @@ -122,7 +121,7 @@ trait FileScan extends Scan case (key, value) => val redactedValue = Utils.redact(conf.stringRedactionPattern, value) - key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) + key + ": " + Utils.abbreviate(redactedValue, maxMetadataValueLength) }.mkString(", ") s"${this.getClass.getSimpleName} $metadataStr" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org