This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f6038302dd6 [SPARK-45793][CORE] Improve the built-in compression codecs
f6038302dd6 is described below

commit f6038302dd615f4bf9bed9c4af3d04426f7e5c5e
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Mon Nov 6 20:06:39 2023 +0800

    [SPARK-45793][CORE] Improve the built-in compression codecs
    
    ### What changes were proposed in this pull request?
    Currently, Spark supported many built-in compression codecs used for I/O 
and storage.
    There are a lot of magic strings copy from built-in compression codecs. 
This issue lead to developers need to manually maintain its consistency. It is 
easy to make mistakes and reduce development efficiency.
    
    ### Why are the changes needed?
    Improve some code for storage compression codecs
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43659 from beliefer/improve_storage_code.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../deploy/history/HistoryServerMemoryManager.scala |  3 ++-
 .../org/apache/spark/internal/config/package.scala  |  7 ++++---
 .../org/apache/spark/io/CompressionCodec.scala      | 21 ++++++++++++---------
 .../deploy/history/EventLogFileWritersSuite.scala   |  6 +++---
 .../deploy/history/FsHistoryProviderSuite.scala     |  5 +++--
 .../org/apache/spark/io/CompressionCodecSuite.scala |  8 ++++----
 .../apache/spark/storage/FallbackStorageSuite.scala |  3 ++-
 .../collection/ExternalAppendOnlyMapSuite.scala     |  3 +--
 .../k8s/integrationtest/BasicTestsSuite.scala       |  3 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala     |  3 ++-
 .../spark/sql/execution/streaming/OffsetSeq.scala   |  3 ++-
 .../streaming/state/RocksDBFileManager.scala        |  2 +-
 12 files changed, 38 insertions(+), 29 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
index 00e58cbdc57..b95f1ed24f3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.History._
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.Utils
 
 /**
@@ -75,7 +76,7 @@ private class HistoryServerMemoryManager(
 
   private def approximateMemoryUsage(eventLogSize: Long, codec: 
Option[String]): Long = {
     codec match {
-      case Some("zstd") =>
+      case Some(CompressionCodec.ZSTD) =>
         eventLogSize * 10
       case Some(_) =>
         eventLogSize * 4
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 93a42eec832..bbadf91fc41 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 import java.util.concurrent.TimeUnit
 
 import org.apache.spark.SparkContext
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.metrics.GarbageCollectionMetrics
 import org.apache.spark.network.shuffle.Constants
@@ -1530,7 +1531,7 @@ package object config {
         "use fully qualified class names to specify the codec.")
       .version("3.0.0")
       .stringConf
-      .createWithDefault("zstd")
+      .createWithDefault(CompressionCodec.ZSTD)
 
   private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD =
     ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold")
@@ -1871,7 +1872,7 @@ package object config {
         "the codec")
       .version("0.8.0")
       .stringConf
-      .createWithDefaultString("lz4")
+      .createWithDefaultString(CompressionCodec.LZ4)
 
   private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE =
     ConfigBuilder("spark.io.compression.zstd.bufferSize")
@@ -1914,7 +1915,7 @@ package object config {
         "the codec.")
       .version("3.0.0")
       .stringConf
-      .createWithDefault("zstd")
+      .createWithDefault(CompressionCodec.ZSTD)
 
   private[spark] val BUFFER_SIZE =
     ConfigBuilder("spark.buffer.size")
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0bb392deb39..a6a5b1f67c6 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -58,18 +58,21 @@ trait CompressionCodec {
 
 private[spark] object CompressionCodec {
 
-  private val configKey = IO_COMPRESSION_CODEC.key
-
   private[spark] def supportsConcatenationOfSerializedStreams(codec: 
CompressionCodec): Boolean = {
     (codec.isInstanceOf[SnappyCompressionCodec] || 
codec.isInstanceOf[LZFCompressionCodec]
       || codec.isInstanceOf[LZ4CompressionCodec] || 
codec.isInstanceOf[ZStdCompressionCodec])
   }
 
-  private val shortCompressionCodecNames = Map(
-    "lz4" -> classOf[LZ4CompressionCodec].getName,
-    "lzf" -> classOf[LZFCompressionCodec].getName,
-    "snappy" -> classOf[SnappyCompressionCodec].getName,
-    "zstd" -> classOf[ZStdCompressionCodec].getName)
+  val LZ4 = "lz4"
+  val LZF = "lzf"
+  val SNAPPY = "snappy"
+  val ZSTD = "zstd"
+
+  private[spark] val shortCompressionCodecNames = Map(
+    LZ4 -> classOf[LZ4CompressionCodec].getName,
+    LZF -> classOf[LZFCompressionCodec].getName,
+    SNAPPY -> classOf[SnappyCompressionCodec].getName,
+    ZSTD -> classOf[ZStdCompressionCodec].getName)
 
   def getCodecName(conf: SparkConf): String = {
     conf.get(IO_COMPRESSION_CODEC)
@@ -93,7 +96,7 @@ private[spark] object CompressionCodec {
       errorClass = "CODEC_NOT_AVAILABLE",
       messageParameters = Map(
         "codecName" -> codecName,
-        "configKey" -> toConf(configKey),
+        "configKey" -> toConf(IO_COMPRESSION_CODEC.key),
         "configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC))))
   }
 
@@ -113,7 +116,7 @@ private[spark] object CompressionCodec {
     }
   }
 
-  val FALLBACK_COMPRESSION_CODEC = "snappy"
+  val FALLBACK_COMPRESSION_CODEC = SNAPPY
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index b575cbc080c..349985207e4 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -176,7 +176,7 @@ class SingleEventLogFileWriterSuite extends 
EventLogFileWritersSuite {
       baseDirUri, "app1", None, None))
     // with compression
     assert(s"${baseDirUri.toString}/app1.lzf" ===
-      SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None, 
Some("lzf")))
+      SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None, 
Some(CompressionCodec.LZF)))
     // illegal characters in app ID
     assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" ===
       SingleEventLogFileWriter.getLogPath(baseDirUri,
@@ -184,7 +184,7 @@ class SingleEventLogFileWriterSuite extends 
EventLogFileWritersSuite {
     // illegal characters in app ID with compression
     assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" ===
       SingleEventLogFileWriter.getLogPath(baseDirUri,
-        "a fine:mind$dollar{bills}.1", None, Some("lz4")))
+        "a fine:mind$dollar{bills}.1", None, Some(CompressionCodec.LZ4)))
   }
 
   override protected def createWriter(
@@ -239,7 +239,7 @@ class RollingEventLogFilesWriterSuite extends 
EventLogFileWritersSuite {
     // with compression
     assert(s"$logDir/${EVENT_LOG_FILE_NAME_PREFIX}1_${appId}.lzf" ===
       RollingEventLogFilesWriter.getEventLogFilePath(logDir, appId, 
appAttemptId,
-        1, Some("lzf")).toString)
+        1, Some(CompressionCodec.LZF)).toString)
 
     // illegal characters in app ID
     
assert(s"${baseDirUri.toString}/${EVENT_LOG_DIR_NAME_PREFIX}a-fine-mind_dollar_bills__1"
 ===
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index ae8481a852b..d16e904bdcf 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -126,8 +126,9 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with P
 
     // Write a new-style application log.
     val newAppCompressedComplete = newLogFile("new1compressed", None, 
inProgress = false,
-      Some("lzf"))
-    writeFile(newAppCompressedComplete, 
Some(CompressionCodec.createCodec(conf, "lzf")),
+      Some(CompressionCodec.LZF))
+    writeFile(
+      newAppCompressedComplete, Some(CompressionCodec.createCodec(conf, 
CompressionCodec.LZF)),
       SparkListenerApplicationStart(newAppCompressedComplete.getName(), 
Some("new-complete-lzf"),
         1L, "test", None),
       SparkListenerApplicationEnd(4L))
diff --git 
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala 
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 244c007f539..9c9fac0d483 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -58,7 +58,7 @@ class CompressionCodecSuite extends SparkFunSuite {
   }
 
   test("lz4 compression codec short form") {
-    val codec = CompressionCodec.createCodec(conf, "lz4")
+    val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZ4)
     assert(codec.getClass === classOf[LZ4CompressionCodec])
     testCodec(codec)
   }
@@ -76,7 +76,7 @@ class CompressionCodecSuite extends SparkFunSuite {
   }
 
   test("lzf compression codec short form") {
-    val codec = CompressionCodec.createCodec(conf, "lzf")
+    val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZF)
     assert(codec.getClass === classOf[LZFCompressionCodec])
     testCodec(codec)
   }
@@ -94,7 +94,7 @@ class CompressionCodecSuite extends SparkFunSuite {
   }
 
   test("snappy compression codec short form") {
-    val codec = CompressionCodec.createCodec(conf, "snappy")
+    val codec = CompressionCodec.createCodec(conf, CompressionCodec.SNAPPY)
     assert(codec.getClass === classOf[SnappyCompressionCodec])
     testCodec(codec)
   }
@@ -115,7 +115,7 @@ class CompressionCodecSuite extends SparkFunSuite {
   }
 
   test("zstd compression codec short form") {
-    val codec = CompressionCodec.createCodec(conf, "zstd")
+    val codec = CompressionCodec.createCodec(conf, CompressionCodec.ZSTD)
     assert(codec.getClass === classOf[ZStdCompressionCodec])
     testCodec(codec)
   }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 83c9707bfc2..6c51bd4ff2e 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually.{eventually, 
interval, timeout}
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, TestUtils}
 import org.apache.spark.LocalSparkContext.withSpark
 import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.buffer.ManagedBuffer
@@ -292,7 +293,7 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-  Seq("lz4", "lzf", "snappy", "zstd").foreach { codec =>
+  CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
     test(s"$codec - Newly added executors should access old data from remote 
storage") {
       sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, 
codec))
       withSpark(sc) { sc =>
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 59f6e3f2d35..2a760c39b46 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -37,7 +37,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   with Matchers {
   import TestUtils.{assertNotSpilled, assertSpilled}
 
-  private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
   private def createCombiner[T](i: T) = ArrayBuffer[T](i)
   private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = 
buffer += i
   private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): 
ArrayBuffer[T] =
@@ -224,7 +223,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     // Keep track of which compression codec we're using to report in test 
failure messages
     var lastCompressionCodec: Option[String] = None
     try {
-      allCompressionCodecs.foreach { c =>
+      CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
         lastCompressionCodec = Some(c)
         testSimpleSpilling(Some(c), encrypt)
       }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 992fe7c97ff..d6911aadfa2 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.time.{Seconds, Span}
 
 import org.apache.spark.{SparkFunSuite, TestUtils}
 import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.SPARK_PI_MAIN_CLASS
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.launcher.SparkLauncher
 
 private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
@@ -93,7 +94,7 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
   test("Run SparkPi with an argument.", k8sTestTag) {
     // This additional configuration with snappy is for SPARK-26995
     sparkAppConf
-      .set("spark.io.compression.codec", "snappy")
+      .set("spark.io.compression.codec", CompressionCodec.SNAPPY)
     runSparkPiAndVerifyCompletion(appArgs = Array("5"))
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1af0b41d0fa..ecc3e6e101f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, 
TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
@@ -1997,7 +1998,7 @@ object SQLConf {
         "use fully qualified class names to specify the codec. Default codec 
is lz4.")
       .version("3.1.0")
       .stringConf
-      .createWithDefault("lz4")
+      .createWithDefault(CompressionCodec.LZ4)
 
   val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED =
     buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 913805d1a07..dea75e3ec47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -21,6 +21,7 @@ import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
 import 
org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper,
 StreamingAggregationStateManager, SymmetricHashJoinStateManager}
@@ -118,7 +119,7 @@ object OffsetSeqMetadata extends Logging {
       StreamingAggregationStateManager.legacyVersion.toString,
     STREAMING_JOIN_STATE_FORMAT_VERSION.key ->
       SymmetricHashJoinStateManager.legacyVersion.toString,
-    STATE_STORE_COMPRESSION_CODEC.key -> "lz4",
+    STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4,
     STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false"
   )
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 6a62a6c52f5..046cf69f1fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -126,7 +126,7 @@ class RocksDBFileManager(
     dfsRootDir: String,
     localTempDir: File,
     hadoopConf: Configuration,
-    codecName: String = "zstd",
+    codecName: String = CompressionCodec.ZSTD,
     loggingId: String = "")
   extends Logging {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to