This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 11037cb8c chore: Delete unused code (#2565)
11037cb8c is described below

commit 11037cb8c5af9ad5165c85789d59981a04b75ee0
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 13 23:02:45 2025 +0800

    chore: Delete unused code (#2565)
---
 .../execution/shuffle/CometShuffleManager.scala    | 14 ------
 .../sql/comet/execution/shuffle/ShuffleUtils.scala | 53 ----------------------
 2 files changed, 67 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index a2af185e5..927e30932 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -29,8 +29,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.SparkEnv
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
-import org.apache.spark.io.CompressionCodec
 import org.apache.spark.shuffle._
 import org.apache.spark.shuffle.api.ShuffleExecutorComponents
 import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, 
SerializedShuffleHandle, SortShuffleManager, SortShuffleWriter}
@@ -268,18 +266,6 @@ object CometShuffleManager extends Logging {
     executorComponents
   }
 
-  lazy val compressionCodecForShuffling: CompressionCodec = {
-    val sparkConf = SparkEnv.get.conf
-    val codecName = 
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get(SQLConf.get)
-
-    // only zstd compression is supported at the moment
-    if (codecName != "zstd") {
-      logWarning(
-        s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, 
force using zstd")
-    }
-    CompressionCodec.createCodec(sparkConf, "zstd")
-  }
-
   def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): 
Boolean = {
     // We cannot bypass sorting if we need to do map-side aggregation.
     if (dep.mapSideCombine) {
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
deleted file mode 100644
index 23b4a5ec2..000000000
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.execution.shuffle
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC, 
SHUFFLE_COMPRESS}
-import org.apache.spark.io.CompressionCodec
-
-import org.apache.comet.CometConf
-
-private[spark] object ShuffleUtils extends Logging {
-  // optional compression codec to use when compressing shuffle files
-  lazy val compressionCodecForShuffling: Option[CompressionCodec] = {
-    val sparkConf = SparkEnv.get.conf
-    val shuffleCompressionEnabled = sparkConf.getBoolean(SHUFFLE_COMPRESS.key, 
true)
-    val sparkShuffleCodec = sparkConf.get(IO_COMPRESSION_CODEC.key, "lz4")
-    val cometShuffleCodec = 
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get()
-    if (shuffleCompressionEnabled) {
-      if (sparkShuffleCodec != cometShuffleCodec) {
-        logWarning(
-          s"Overriding config $IO_COMPRESSION_CODEC=$sparkShuffleCodec in 
shuffling, " +
-            s"force using $cometShuffleCodec")
-      }
-      cometShuffleCodec match {
-        case "zstd" =>
-          Some(CompressionCodec.createCodec(sparkConf, "zstd"))
-        case other =>
-          throw new UnsupportedOperationException(
-            s"Unsupported shuffle compression codec: $other")
-      }
-    } else {
-      None
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to