This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 945ac2342 [GLUTEN-6180][VL] Fix NPE if spilling is requested during
task creation (#6205)
945ac2342 is described below
commit 945ac2342202533ccf862e248ef262a377ba1569
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jun 26 09:57:38 2024 +0800
[GLUTEN-6180][VL] Fix NPE if spilling is requested during task creation
(#6205)
---
.../gluten/memory/memtarget/MemoryTargets.java | 2 +-
.../memory/arrow/alloc/ArrowBufferAllocators.java | 11 +-
.../gluten/memory/nmm/NativeMemoryManagers.java | 157 +++++++++++----------
.../gluten/vectorized/NativePlanEvaluator.java | 2 +-
4 files changed, 91 insertions(+), 81 deletions(-)
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 2d6fc0748..c3ece7433 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -63,6 +63,6 @@ public final class MemoryTargets {
factory = TreeMemoryConsumers.shared();
}
- return dynamicOffHeapSizingIfEnabled(factory.newConsumer(tmm, name,
spillers, virtualChildren));
+ return factory.newConsumer(tmm, name, spillers, virtualChildren);
}
}
diff --git
a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
index efee20e48..51f49da70 100644
---
a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
+++
b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
@@ -60,11 +60,12 @@ public class ArrowBufferAllocators {
listener =
new ManagedAllocationListener(
MemoryTargets.throwOnOom(
- MemoryTargets.newConsumer(
- tmm,
- "ArrowContextInstance",
- Collections.emptyList(),
- Collections.emptyMap())),
+ MemoryTargets.dynamicOffHeapSizingIfEnabled(
+ MemoryTargets.newConsumer(
+ tmm,
+ "ArrowContextInstance",
+ Collections.emptyList(),
+ Collections.emptyMap()))),
TaskResources.getSharedUsage());
}
diff --git
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
index 928f869ba..37456badd 100644
---
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
+++
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
@@ -26,6 +26,8 @@ import org.apache.gluten.proto.MemoryUsageStats;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
@@ -37,6 +39,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public final class NativeMemoryManagers {
+ private static final Logger LOG =
LoggerFactory.getLogger(NativeMemoryManagers.class);
// TODO: Let all caller support spill.
public static NativeMemoryManager contextInstance(String name) {
@@ -67,86 +70,92 @@ public final class NativeMemoryManagers {
final MemoryTarget target =
MemoryTargets.throwOnOom(
MemoryTargets.overAcquire(
- MemoryTargets.newConsumer(
- tmm,
- name,
- // call memory manager's shrink API, if no good then call
the spiller
- Stream.concat(
- Stream.of(
- new Spiller() {
- @Override
- public long spill(MemoryTarget self, long
size) {
- return Optional.of(out.get())
- .map(nmm -> nmm.shrink(size))
- .orElseThrow(
- () ->
- new IllegalStateException(
- ""
- + "Shrink is requested
before native "
- + "memory manager is
created. Try moving "
- + "any actions about
memory allocation out "
- + "from the memory
manager constructor."));
- }
+ MemoryTargets.dynamicOffHeapSizingIfEnabled(
+ MemoryTargets.newConsumer(
+ tmm,
+ name,
+ // call memory manager's shrink API, if no good then
call the spiller
+ Stream.concat(
+ Stream.of(
+ new Spiller() {
+ @Override
+ public long spill(MemoryTarget self,
long size) {
+ return Optional.ofNullable(out.get())
+ .map(nmm -> nmm.shrink(size))
+ .orElseGet(
+ () -> {
+ LOG.warn(
+ "Shrink is requested
before native "
+ + "memory manager is
created. Try moving "
+ + "any actions about
memory allocation"
+ + " out from the
memory manager"
+ + " constructor.");
+ return 0L;
+ });
+ }
- @Override
- public Set<Phase> applicablePhases() {
- return Spillers.PHASE_SET_SHRINK_ONLY;
- }
- }),
- spillers.stream())
- .map(spiller -> Spillers.withMinSpillSize(spiller,
reservationBlockSize))
- .collect(Collectors.toList()),
- Collections.singletonMap(
- "single",
- new MemoryUsageRecorder() {
- @Override
- public void inc(long bytes) {
- // no-op
- }
+ @Override
+ public Set<Phase> applicablePhases() {
+ return Spillers.PHASE_SET_SHRINK_ONLY;
+ }
+ }),
+ spillers.stream())
+ .map(
+ spiller -> Spillers.withMinSpillSize(spiller,
reservationBlockSize))
+ .collect(Collectors.toList()),
+ Collections.singletonMap(
+ "single",
+ new MemoryUsageRecorder() {
+ @Override
+ public void inc(long bytes) {
+ // no-op
+ }
- @Override
- public long peak() {
- throw new UnsupportedOperationException("Not
implemented");
- }
+ @Override
+ public long peak() {
+ throw new UnsupportedOperationException("Not
implemented");
+ }
- @Override
- public long current() {
- throw new UnsupportedOperationException("Not
implemented");
- }
+ @Override
+ public long current() {
+ throw new UnsupportedOperationException("Not
implemented");
+ }
- @Override
- public MemoryUsageStats toStats() {
- return
getNativeMemoryManager().collectMemoryUsage();
- }
+ @Override
+ public MemoryUsageStats toStats() {
+ return
getNativeMemoryManager().collectMemoryUsage();
+ }
- private NativeMemoryManager getNativeMemoryManager()
{
- return Optional.of(out.get())
- .orElseThrow(
- () ->
- new IllegalStateException(
- ""
- + "Memory usage stats are
requested before native "
- + "memory manager is created.
Try moving any "
- + "actions about memory
allocation out from the "
- + "memory manager
constructor."));
- }
- })),
- MemoryTargets.newConsumer(
- tmm,
- "OverAcquire.DummyTarget",
- Collections.singletonList(
- new Spiller() {
- @Override
- public long spill(MemoryTarget self, long size) {
- return self.repay(size);
- }
+ private NativeMemoryManager
getNativeMemoryManager() {
+ return Optional.ofNullable(out.get())
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ ""
+ + "Memory usage stats are
requested before"
+ + " native memory manager
is created. Try"
+ + " moving any actions
about memory"
+ + " allocation out from
the memory manager"
+ + " constructor."));
+ }
+ }))),
+ MemoryTargets.dynamicOffHeapSizingIfEnabled(
+ MemoryTargets.newConsumer(
+ tmm,
+ "OverAcquire.DummyTarget",
+ Collections.singletonList(
+ new Spiller() {
+ @Override
+ public long spill(MemoryTarget self, long size) {
+ return self.repay(size);
+ }
- @Override
- public Set<Phase> applicablePhases() {
- return Spillers.PHASE_SET_ALL;
- }
- }),
- Collections.emptyMap()),
+ @Override
+ public Set<Phase> applicablePhases() {
+ return Spillers.PHASE_SET_ALL;
+ }
+ }),
+ Collections.emptyMap())),
overAcquiredRatio));
// listener
ManagedReservationListener rl =
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index e54724a59..2ac048b2b 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -71,7 +71,7 @@ public class NativePlanEvaluator {
@Override
public long spill(MemoryTarget self, long size) {
ColumnarBatchOutIterator instance =
- Optional.of(outIterator.get())
+ Optional.ofNullable(outIterator.get())
.orElseThrow(
() ->
new IllegalStateException(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]