This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e36f9de67693 [SPARK-53213][CORE][SQL][K8S] Use Java `Base64` instead of `Base64.(decodeBase64|encodeBase64String)` e36f9de67693 is described below commit e36f9de67693a116490730fbf0b1699c83690343 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Sat Aug 9 07:22:46 2025 -0700 [SPARK-53213][CORE][SQL][K8S] Use Java `Base64` instead of `Base64.(decodeBase64|encodeBase64String)` ### What changes were proposed in this pull request? This PR aims to use Java `java.util.Base64` instead of `Base64.encodeBase64String` or `Base64.decodeBase64` to improve performance. ### Why are the changes needed? Java native API is over **9x faster** than `Commons Codec`. ```scala scala> val a = new Array[Byte](1_000_000_000) scala> spark.time(org.apache.commons.codec.binary.Base64.decodeBase64(org.apache.commons.codec.binary.Base64.encodeBase64String(a)).length) Time taken: 10121 ms val res0: Int = 1000000000 scala> spark.time(java.util.Base64.getDecoder().decode(java.util.Base64.getEncoder().encodeToString(a)).length) Time taken: 1156 ms val res1: Int = 1000000000 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51938 from dongjoon-hyun/SPARK-53213. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala | 7 ++++--- .../deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala | 5 +++-- .../spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala | 7 +++---- scalastyle-config.xml | 5 +++++ .../org/apache/spark/sql/catalyst/plans/logical/Statistics.scala | 5 +++-- .../org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala | 2 ++ .../java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java | 6 +++--- .../main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 5 ++--- 8 files changed, 25 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index 67133755ee1a..b62b5dc3e1fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -18,12 +18,12 @@ package org.apache.spark.deploy.k8s.features import java.io.File import java.nio.file.Files +import java.util.Base64 import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import io.fabric8.kubernetes.api.model._ -import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.deploy.SparkHadoopUtil @@ -226,6 +226,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + val encodeToString = Base64.getEncoder().encodeToString(_) Seq[HasMetadata]() ++ { krb5File.map { path => val file = new File(path) @@ -247,7 +248,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri .withName(ktSecretName) .endMetadata() .withImmutable(true) - .addToData(kt.getName(), Base64.encodeBase64String(Files.readAllBytes(kt.toPath))) + .addToData(kt.getName(), encodeToString(Files.readAllBytes(kt.toPath))) .build()) } else { Nil @@ -259,7 +260,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri .withName(dtSecretName) .endMetadata() .withImmutable(true) - .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens)) + .addToData(KERBEROS_SECRET_KEY, encodeToString(delegationTokens)) .build()) } else { Nil diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala index 77d4ab262a0f..0da39b30e388 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.k8s.features import java.io.File import java.nio.file.Files import java.security.PrivilegedExceptionAction +import java.util.Base64 import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model.{ConfigMap, Secret} -import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.io.Text import org.apache.hadoop.security.UserGroupInformation @@ -126,7 +126,8 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { val step = createStep(new SparkConf(false)) val dtSecret = filter[Secret](step.getAdditionalKubernetesResources()).head - assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === Base64.encodeBase64String(tokens)) + assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === + Base64.getEncoder().encodeToString(tokens)) checkPodForTokens(step.configurePod(SparkPod.initialPod()), dtSecret.getMetadata().getName()) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index e06364a2b8b8..234f627202c1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -16,12 +16,11 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.util.Locale +import java.util.{Base64, Locale} import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder} -import org.apache.commons.codec.binary.Base64 import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers._ @@ -36,8 +35,8 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => sb.withNewMetadata() .withName(ENV_SECRET_NAME) .endMetadata() - val secUsername = Base64.encodeBase64String(ENV_SECRET_VALUE_1.getBytes()) - val secPassword = Base64.encodeBase64String(ENV_SECRET_VALUE_2.getBytes()) + val secUsername = Base64.getEncoder().encodeToString(ENV_SECRET_VALUE_1.getBytes()) + val secPassword = Base64.getEncoder().encodeToString(ENV_SECRET_VALUE_2.getBytes()) val envSecretData = Map(ENV_SECRET_KEY_1 -> secUsername, ENV_SECRET_KEY_2 -> secPassword) sb.addToData(envSecretData.asJava) val envSecret = sb.build() diff --git a/scalastyle-config.xml b/scalastyle-config.xml index a4ffb42400a3..bc28bfe295a8 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -443,6 +443,11 @@ This file is divided into 3 sections: <customMessage>Use listFiles of SparkFileUtil or Utils instead.</customMessage> </check> + <check customId="commonscodecbase64" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">\bBase64\.(en|de)codeBase64</parameter></parameters> + <customMessage>Use java.util.Base64 instead</customMessage> + </check> + <check customId="commonslang3javaversion" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">org\.apache\.commons\.lang3\..*JavaVersion</parameter></parameters> <customMessage>Use JEP 223 API (java.lang.Runtime.Version) instead of diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 6f3ec3bf37d9..a2850a0b179f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.math.{MathContext, RoundingMode} +import java.util.Base64 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} @@ -202,12 +203,12 @@ object HistogramSerializer { out.flush() out.close() - org.apache.commons.codec.binary.Base64.encodeBase64String(bos.toByteArray) + Base64.getEncoder().encodeToString(bos.toByteArray) } /** Deserializes a given string to a histogram. */ final def deserialize(str: String): Histogram = { - val bytes = org.apache.commons.codec.binary.Base64.decodeBase64(str) + val bytes = Base64.getDecoder().decode(str) val bis = new ByteArrayInputStream(bytes) val ins = new DataInputStream(new LZ4BlockInputStream(bis)) val height = ins.readDouble() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala index bbc0ff968541..b5290eafae3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala @@ -52,6 +52,7 @@ object Base64Benchmark extends SqlBasedBenchmark { } } + // scalastyle:off commonscodecbase64 override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { Seq(1, 3, 5, 7).map { len => val benchmark = new Benchmark(s"encode for $len", N, output = output) @@ -75,4 +76,5 @@ object Base64Benchmark extends SqlBasedBenchmark { benchmark }.foreach(_.run()) } + // scalastyle:on commonscodecbase64 } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java index dabce675649e..350ef89ec4d2 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.PrivilegedExceptionAction; import java.security.SecureRandom; +import java.util.Base64; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -31,7 +32,6 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import jakarta.ws.rs.core.NewCookie; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -423,7 +423,7 @@ public class ThriftHttpServlet extends TServlet { gssContext = manager.createContext(serverCreds); // Get service ticket from the authorization header String serviceTicketBase64 = getAuthHeader(request, authType); - byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes()); + byte[] inToken = Base64.getDecoder().decode(serviceTicketBase64.getBytes()); gssContext.acceptSecContext(inToken, 0, inToken.length); // Authenticate or deny based on its context completion if (!gssContext.isEstablished()) { @@ -504,7 +504,7 @@ public class ThriftHttpServlet extends TServlet { String authType) throws HttpAuthenticationException { String authHeaderBase64 = getAuthHeader(request, authType); String authHeaderString = StringUtils.newStringUtf8( - Base64.decodeBase64(authHeaderBase64.getBytes())); + Base64.getDecoder().decode(authHeaderBase64.getBytes())); String[] creds = authHeaderString.split(":"); return creds; } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 7f58f69269dd..440fc582b93f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.hive.orc -import java.util.Properties +import java.util.{Base64, Properties} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Output -import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.io.orc._ @@ -215,7 +214,7 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria val out = new Output(4 * 1024, 10 * 1024 * 1024) kryo.writeObject(out, sarg) out.close() - Base64.encodeBase64String(out.toBytes) + Base64.getEncoder().encodeToString(out.toBytes) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org