This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 11037cb8c chore: Delete unused code (#2565)
11037cb8c is described below
commit 11037cb8c5af9ad5165c85789d59981a04b75ee0
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 13 23:02:45 2025 +0800
chore: Delete unused code (#2565)
---
.../execution/shuffle/CometShuffleManager.scala | 14 ------
.../sql/comet/execution/shuffle/ShuffleUtils.scala | 53 ----------------------
2 files changed, 67 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index a2af185e5..927e30932 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -29,8 +29,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
-import org.apache.spark.io.CompressionCodec
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle,
SerializedShuffleHandle, SortShuffleManager, SortShuffleWriter}
@@ -268,18 +266,6 @@ object CometShuffleManager extends Logging {
executorComponents
}
- lazy val compressionCodecForShuffling: CompressionCodec = {
- val sparkConf = SparkEnv.get.conf
- val codecName =
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get(SQLConf.get)
-
- // only zstd compression is supported at the moment
- if (codecName != "zstd") {
- logWarning(
- s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling,
force using zstd")
- }
- CompressionCodec.createCodec(sparkConf, "zstd")
- }
-
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]):
Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
deleted file mode 100644
index 23b4a5ec2..000000000
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.execution.shuffle
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC,
SHUFFLE_COMPRESS}
-import org.apache.spark.io.CompressionCodec
-
-import org.apache.comet.CometConf
-
-private[spark] object ShuffleUtils extends Logging {
- // optional compression codec to use when compressing shuffle files
- lazy val compressionCodecForShuffling: Option[CompressionCodec] = {
- val sparkConf = SparkEnv.get.conf
- val shuffleCompressionEnabled = sparkConf.getBoolean(SHUFFLE_COMPRESS.key,
true)
- val sparkShuffleCodec = sparkConf.get(IO_COMPRESSION_CODEC.key, "lz4")
- val cometShuffleCodec =
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get()
- if (shuffleCompressionEnabled) {
- if (sparkShuffleCodec != cometShuffleCodec) {
- logWarning(
- s"Overriding config $IO_COMPRESSION_CODEC=$sparkShuffleCodec in
shuffling, " +
- s"force using $cometShuffleCodec")
- }
- cometShuffleCodec match {
- case "zstd" =>
- Some(CompressionCodec.createCodec(sparkConf, "zstd"))
- case other =>
- throw new UnsupportedOperationException(
- s"Unsupported shuffle compression codec: $other")
- }
- } else {
- None
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]