This is an automated email from the ASF dual-hosted git repository.
mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 59a473cf34 [GLUTEN-10349] Remove the unnecessary Set and use enum
directly (#10350)
59a473cf34 is described below
commit 59a473cf34f6e159b5aa51b179c842b31e7c2e5c
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Aug 13 10:39:17 2025 +0800
[GLUTEN-10349] Remove the unnecessary Set and use enum directly (#10350)
---
.../shuffle/VeloxCelebornColumnarShuffleWriter.scala | 20 ++++++++++----------
.../writer/VeloxUniffleColumnarShuffleWriter.java | 3 +--
.../apache/spark/shuffle/ColumnarShuffleWriter.scala | 18 +++++++++---------
.../gluten/vectorized/NativePlanEvaluator.java | 3 +--
.../apache/gluten/memory/NativeMemoryManager.scala | 14 ++++++--------
.../org/apache/gluten/memory/memtarget/Spillers.java | 6 ------
6 files changed, 27 insertions(+), 37 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 881d9cf660..d2c27e960c 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType,
RssSortShuffleWriterType, SortShuffleWriterType}
-import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
+import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller}
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.vectorized._
@@ -179,16 +179,16 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
runtime
.memoryManager()
.addSpiller(new Spiller() {
- override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long = {
- if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
- return 0L
+ override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long =
+ phase match {
+ case Spiller.Phase.SPILL =>
+ logInfo(s"Gluten shuffle writer: Trying to push $size bytes of
data")
+ // fixme pass true when being called by self
+ val pushed =
shuffleWriterJniWrapper.reclaim(nativeShuffleWriter, size)
+ logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of
data")
+ pushed
+ case _ => 0L
}
- logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
- // fixme pass true when being called by self
- val pushed = shuffleWriterJniWrapper.reclaim(nativeShuffleWriter,
size)
- logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of
data")
- pushed
- }
})
}
diff --git
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 9562dcbc4b..3240a50064 100644
---
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -22,7 +22,6 @@ import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.config.SortShuffleWriterType$;
import org.apache.gluten.memory.memtarget.MemoryTarget;
import org.apache.gluten.memory.memtarget.Spiller;
-import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.vectorized.GlutenSplitResult;
@@ -193,7 +192,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
new Spiller() {
@Override
public long spill(MemoryTarget self, Spiller.Phase phase,
long size) {
- if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
+ if (!Spiller.Phase.SPILL.equals(phase)) {
return 0L;
}
LOG.info("Gluten shuffle writer: Trying to push {} bytes
of data", size);
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 34895ceca3..94735e5b70 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType,
SortShuffleWriterType}
-import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
+import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller}
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.vectorized._
@@ -187,15 +187,15 @@ class ColumnarShuffleWriter[K, V](
runtime
.memoryManager()
.addSpiller(new Spiller() {
- override def spill(self: MemoryTarget, phase: Spiller.Phase,
size: Long): Long = {
- if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
- return 0L
+ override def spill(self: MemoryTarget, phase: Spiller.Phase,
size: Long): Long =
+ phase match {
+ case Spiller.Phase.SPILL =>
+ logInfo(s"Gluten shuffle writer: Trying to spill $size
bytes of data")
+ val spilled =
shuffleWriterJniWrapper.reclaim(nativeShuffleWriter, size)
+ logInfo(s"Gluten shuffle writer: Spilled $spilled / $size
bytes of data")
+ spilled
+ case _ => 0L
}
- logInfo(s"Gluten shuffle writer: Trying to spill $size bytes
of data")
- val spilled =
shuffleWriterJniWrapper.reclaim(nativeShuffleWriter, size)
- logInfo(s"Gluten shuffle writer: Spilled $spilled / $size
bytes of data")
- spilled
- }
})
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index b3889fb231..dcd0f17623 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -18,7 +18,6 @@ package org.apache.gluten.vectorized;
import org.apache.gluten.memory.memtarget.MemoryTarget;
import org.apache.gluten.memory.memtarget.Spiller;
-import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.DebugUtil;
@@ -87,7 +86,7 @@ public class NativePlanEvaluator {
new Spiller() {
@Override
public long spill(MemoryTarget self, Spiller.Phase phase, long
size) {
- if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
+ if (!Spiller.Phase.SPILL.equals(phase)) {
return 0L;
}
long spilled = out.spill(size);
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 5aae7f6c57..3955756346 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -56,14 +56,12 @@ object NativeMemoryManager {
.getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
)
spillers.append(new Spiller() {
- override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long = {
- if (!Spillers.PHASE_SET_SHRINK_ONLY.contains(phase)) {
- // Only respond for shrinking.
- return 0L
- }
- val shrunk = NativeMemoryManagerJniWrapper.shrink(handle, size)
- LOGGER.info(s"NativeMemoryManager: Shrunk $shrunk / $size bytes of
data.")
- shrunk
+ override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long = phase match {
+ case Spiller.Phase.SHRINK => // Only respond for shrinking.
+ val shrunk = NativeMemoryManagerJniWrapper.shrink(handle, size)
+ LOGGER.info(s"NativeMemoryManager: Shrunk $shrunk / $size bytes of
data.")
+ shrunk
+ case _ => 0L
}
})
mutableStats += "single" -> new MemoryUsageStatsBuilder {
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java
index 38ed88f577..1c554d6725 100644
--- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java
+++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java
@@ -35,12 +35,6 @@ public final class Spillers {
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(Spiller.Phase.SHRINK,
Spiller.Phase.SPILL)));
- public static final Set<Spiller.Phase> PHASE_SET_SHRINK_ONLY =
- Collections.singleton(Spiller.Phase.SHRINK);
-
- public static final Set<Spiller.Phase> PHASE_SET_SPILL_ONLY =
- Collections.singleton(Spiller.Phase.SPILL);
-
public static Spiller withMinSpillSize(Spiller spiller, long minSize) {
return new WithMinSpillSize(spiller, minSize);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]