This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f6038302dd6 [SPARK-45793][CORE] Improve the built-in compression codecs f6038302dd6 is described below commit f6038302dd615f4bf9bed9c4af3d04426f7e5c5e Author: Jiaan Geng <belie...@163.com> AuthorDate: Mon Nov 6 20:06:39 2023 +0800 [SPARK-45793][CORE] Improve the built-in compression codecs ### What changes were proposed in this pull request? Currently, Spark supported many built-in compression codecs used for I/O and storage. There are a lot of magic strings copy from built-in compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency. ### Why are the changes needed? Improve some code for storage compression codecs ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43659 from beliefer/improve_storage_code. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../deploy/history/HistoryServerMemoryManager.scala | 3 ++- .../org/apache/spark/internal/config/package.scala | 7 ++++--- .../org/apache/spark/io/CompressionCodec.scala | 21 ++++++++++++--------- .../deploy/history/EventLogFileWritersSuite.scala | 6 +++--- .../deploy/history/FsHistoryProviderSuite.scala | 5 +++-- .../org/apache/spark/io/CompressionCodecSuite.scala | 8 ++++---- .../apache/spark/storage/FallbackStorageSuite.scala | 3 ++- .../collection/ExternalAppendOnlyMapSuite.scala | 3 +-- .../k8s/integrationtest/BasicTestsSuite.scala | 3 ++- .../org/apache/spark/sql/internal/SQLConf.scala | 3 ++- .../spark/sql/execution/streaming/OffsetSeq.scala | 3 ++- .../streaming/state/RocksDBFileManager.scala | 2 +- 12 files changed, 38 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala index 00e58cbdc57..b95f1ed24f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config.History._ +import org.apache.spark.io.CompressionCodec import org.apache.spark.util.Utils /** @@ -75,7 +76,7 @@ private class HistoryServerMemoryManager( private def approximateMemoryUsage(eventLogSize: Long, codec: Option[String]): Long = { codec match { - case Some("zstd") => + case Some(CompressionCodec.ZSTD) => eventLogSize * 10 case Some(_) => eventLogSize * 4 diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 93a42eec832..bbadf91fc41 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -21,6 +21,7 @@ import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.SparkContext +import org.apache.spark.io.CompressionCodec import org.apache.spark.launcher.SparkLauncher import org.apache.spark.metrics.GarbageCollectionMetrics import org.apache.spark.network.shuffle.Constants @@ -1530,7 +1531,7 @@ package object config { "use fully qualified class names to specify the codec.") .version("3.0.0") .stringConf - .createWithDefault("zstd") + .createWithDefault(CompressionCodec.ZSTD) private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD = ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold") @@ -1871,7 +1872,7 @@ package object config { "the codec") .version("0.8.0") .stringConf - .createWithDefaultString("lz4") + .createWithDefaultString(CompressionCodec.LZ4) private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE = ConfigBuilder("spark.io.compression.zstd.bufferSize") @@ -1914,7 +1915,7 @@ package object config { "the codec.") .version("3.0.0") .stringConf - .createWithDefault("zstd") + .createWithDefault(CompressionCodec.ZSTD) private[spark] val BUFFER_SIZE = ConfigBuilder("spark.buffer.size") diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 0bb392deb39..a6a5b1f67c6 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -58,18 +58,21 @@ trait CompressionCodec { private[spark] object CompressionCodec { - private val configKey = IO_COMPRESSION_CODEC.key - private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec]) } - private val shortCompressionCodecNames = Map( - "lz4" -> classOf[LZ4CompressionCodec].getName, - "lzf" -> classOf[LZFCompressionCodec].getName, - "snappy" -> classOf[SnappyCompressionCodec].getName, - "zstd" -> classOf[ZStdCompressionCodec].getName) + val LZ4 = "lz4" + val LZF = "lzf" + val SNAPPY = "snappy" + val ZSTD = "zstd" + + private[spark] val shortCompressionCodecNames = Map( + LZ4 -> classOf[LZ4CompressionCodec].getName, + LZF -> classOf[LZFCompressionCodec].getName, + SNAPPY -> classOf[SnappyCompressionCodec].getName, + ZSTD -> classOf[ZStdCompressionCodec].getName) def getCodecName(conf: SparkConf): String = { conf.get(IO_COMPRESSION_CODEC) @@ -93,7 +96,7 @@ private[spark] object CompressionCodec { errorClass = "CODEC_NOT_AVAILABLE", messageParameters = Map( "codecName" -> codecName, - "configKey" -> toConf(configKey), + "configKey" -> toConf(IO_COMPRESSION_CODEC.key), "configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC)))) } @@ -113,7 +116,7 @@ private[spark] object CompressionCodec { } } - val FALLBACK_COMPRESSION_CODEC = "snappy" + val FALLBACK_COMPRESSION_CODEC = SNAPPY val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index b575cbc080c..349985207e4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -176,7 +176,7 @@ class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { baseDirUri, "app1", None, None)) // with compression assert(s"${baseDirUri.toString}/app1.lzf" === - SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None, Some("lzf"))) + SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None, Some(CompressionCodec.LZF))) // illegal characters in app ID assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" === SingleEventLogFileWriter.getLogPath(baseDirUri, @@ -184,7 +184,7 @@ class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { // illegal characters in app ID with compression assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" === SingleEventLogFileWriter.getLogPath(baseDirUri, - "a fine:mind$dollar{bills}.1", None, Some("lz4"))) + "a fine:mind$dollar{bills}.1", None, Some(CompressionCodec.LZ4))) } override protected def createWriter( @@ -239,7 +239,7 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { // with compression assert(s"$logDir/${EVENT_LOG_FILE_NAME_PREFIX}1_${appId}.lzf" === RollingEventLogFilesWriter.getEventLogFilePath(logDir, appId, appAttemptId, - 1, Some("lzf")).toString) + 1, Some(CompressionCodec.LZF)).toString) // illegal characters in app ID assert(s"${baseDirUri.toString}/${EVENT_LOG_DIR_NAME_PREFIX}a-fine-mind_dollar_bills__1" === diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ae8481a852b..d16e904bdcf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -126,8 +126,9 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // Write a new-style application log. val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, - Some("lzf")) - writeFile(newAppCompressedComplete, Some(CompressionCodec.createCodec(conf, "lzf")), + Some(CompressionCodec.LZF)) + writeFile( + newAppCompressedComplete, Some(CompressionCodec.createCodec(conf, CompressionCodec.LZF)), SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"), 1L, "test", None), SparkListenerApplicationEnd(4L)) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 244c007f539..9c9fac0d483 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -58,7 +58,7 @@ class CompressionCodecSuite extends SparkFunSuite { } test("lz4 compression codec short form") { - val codec = CompressionCodec.createCodec(conf, "lz4") + val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZ4) assert(codec.getClass === classOf[LZ4CompressionCodec]) testCodec(codec) } @@ -76,7 +76,7 @@ class CompressionCodecSuite extends SparkFunSuite { } test("lzf compression codec short form") { - val codec = CompressionCodec.createCodec(conf, "lzf") + val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZF) assert(codec.getClass === classOf[LZFCompressionCodec]) testCodec(codec) } @@ -94,7 +94,7 @@ class CompressionCodecSuite extends SparkFunSuite { } test("snappy compression codec short form") { - val codec = CompressionCodec.createCodec(conf, "snappy") + val codec = CompressionCodec.createCodec(conf, CompressionCodec.SNAPPY) assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } @@ -115,7 +115,7 @@ class CompressionCodecSuite extends SparkFunSuite { } test("zstd compression codec short form") { - val codec = CompressionCodec.createCodec(conf, "zstd") + val codec = CompressionCodec.createCodec(conf, CompressionCodec.ZSTD) assert(codec.getClass === classOf[ZStdCompressionCodec]) testCodec(codec) } diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index 83c9707bfc2..6c51bd4ff2e 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.LocalSparkContext.withSpark import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER} import org.apache.spark.network.BlockTransferService import org.apache.spark.network.buffer.ManagedBuffer @@ -292,7 +293,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } - Seq("lz4", "lzf", "snappy", "zstd").foreach { codec => + CompressionCodec.shortCompressionCodecNames.keys.foreach { codec => test(s"$codec - Newly added executors should access old data from remote storage") { sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec)) withSpark(sc) { sc => diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 59f6e3f2d35..2a760c39b46 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -37,7 +37,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with Matchers { import TestUtils.{assertNotSpilled, assertSpilled} - private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS private def createCombiner[T](i: T) = ArrayBuffer[T](i) private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] = @@ -224,7 +223,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // Keep track of which compression codec we're using to report in test failure messages var lastCompressionCodec: Option[String] = None try { - allCompressionCodecs.foreach { c => + CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c => lastCompressionCodec = Some(c) testSimpleSpilling(Some(c), encrypt) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 992fe7c97ff..d6911aadfa2 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.time.{Seconds, Span} import org.apache.spark.{SparkFunSuite, TestUtils} import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.SPARK_PI_MAIN_CLASS +import org.apache.spark.io.CompressionCodec import org.apache.spark.launcher.SparkLauncher private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => @@ -93,7 +94,7 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => test("Run SparkPi with an argument.", k8sTestTag) { // This additional configuration with snappy is for SPARK-26995 sparkAppConf - .set("spark.io.compression.codec", "snappy") + .set("spark.io.compression.codec", CompressionCodec.SNAPPY) runSparkPiAndVerifyCompletion(appArgs = Array("5")) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1af0b41d0fa..ecc3e6e101f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver} @@ -1997,7 +1998,7 @@ object SQLConf { "use fully qualified class names to specify the codec. Default codec is lz4.") .version("3.1.0") .stringConf - .createWithDefault("lz4") + .createWithDefault(CompressionCodec.LZ4) val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED = buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 913805d1a07..dea75e3ec47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -21,6 +21,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager, SymmetricHashJoinStateManager} @@ -118,7 +119,7 @@ object OffsetSeqMetadata extends Logging { StreamingAggregationStateManager.legacyVersion.toString, STREAMING_JOIN_STATE_FORMAT_VERSION.key -> SymmetricHashJoinStateManager.legacyVersion.toString, - STATE_STORE_COMPRESSION_CODEC.key -> "lz4", + STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false" ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 6a62a6c52f5..046cf69f1fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -126,7 +126,7 @@ class RocksDBFileManager( dfsRootDir: String, localTempDir: File, hadoopConf: Configuration, - codecName: String = "zstd", + codecName: String = CompressionCodec.ZSTD, loggingId: String = "") extends Logging { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org