This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 945ac2342 [GLUTEN-6180][VL] Fix NPE if spilling is requested during 
task creation (#6205)
945ac2342 is described below

commit 945ac2342202533ccf862e248ef262a377ba1569
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jun 26 09:57:38 2024 +0800

    [GLUTEN-6180][VL] Fix NPE if spilling is requested during task creation 
(#6205)
---
 .../gluten/memory/memtarget/MemoryTargets.java     |   2 +-
 .../memory/arrow/alloc/ArrowBufferAllocators.java  |  11 +-
 .../gluten/memory/nmm/NativeMemoryManagers.java    | 157 +++++++++++----------
 .../gluten/vectorized/NativePlanEvaluator.java     |   2 +-
 4 files changed, 91 insertions(+), 81 deletions(-)

diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 2d6fc0748..c3ece7433 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -63,6 +63,6 @@ public final class MemoryTargets {
       factory = TreeMemoryConsumers.shared();
     }
 
-    return dynamicOffHeapSizingIfEnabled(factory.newConsumer(tmm, name, 
spillers, virtualChildren));
+    return factory.newConsumer(tmm, name, spillers, virtualChildren);
   }
 }
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
 
b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
index efee20e48..51f49da70 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
@@ -60,11 +60,12 @@ public class ArrowBufferAllocators {
       listener =
           new ManagedAllocationListener(
               MemoryTargets.throwOnOom(
-                  MemoryTargets.newConsumer(
-                      tmm,
-                      "ArrowContextInstance",
-                      Collections.emptyList(),
-                      Collections.emptyMap())),
+                  MemoryTargets.dynamicOffHeapSizingIfEnabled(
+                      MemoryTargets.newConsumer(
+                          tmm,
+                          "ArrowContextInstance",
+                          Collections.emptyList(),
+                          Collections.emptyMap()))),
               TaskResources.getSharedUsage());
     }
 
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
 
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
index 928f869ba..37456badd 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManagers.java
@@ -26,6 +26,8 @@ import org.apache.gluten.proto.MemoryUsageStats;
 
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.util.TaskResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,6 +39,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public final class NativeMemoryManagers {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NativeMemoryManagers.class);
 
   // TODO: Let all caller support spill.
   public static NativeMemoryManager contextInstance(String name) {
@@ -67,86 +70,92 @@ public final class NativeMemoryManagers {
     final MemoryTarget target =
         MemoryTargets.throwOnOom(
             MemoryTargets.overAcquire(
-                MemoryTargets.newConsumer(
-                    tmm,
-                    name,
-                    // call memory manager's shrink API, if no good then call 
the spiller
-                    Stream.concat(
-                            Stream.of(
-                                new Spiller() {
-                                  @Override
-                                  public long spill(MemoryTarget self, long 
size) {
-                                    return Optional.of(out.get())
-                                        .map(nmm -> nmm.shrink(size))
-                                        .orElseThrow(
-                                            () ->
-                                                new IllegalStateException(
-                                                    ""
-                                                        + "Shrink is requested 
before native "
-                                                        + "memory manager is 
created. Try moving "
-                                                        + "any actions about 
memory allocation out "
-                                                        + "from the memory 
manager constructor."));
-                                  }
+                MemoryTargets.dynamicOffHeapSizingIfEnabled(
+                    MemoryTargets.newConsumer(
+                        tmm,
+                        name,
+                        // call memory manager's shrink API, if no good then 
call the spiller
+                        Stream.concat(
+                                Stream.of(
+                                    new Spiller() {
+                                      @Override
+                                      public long spill(MemoryTarget self, 
long size) {
+                                        return Optional.ofNullable(out.get())
+                                            .map(nmm -> nmm.shrink(size))
+                                            .orElseGet(
+                                                () -> {
+                                                  LOG.warn(
+                                                      "Shrink is requested 
before native "
+                                                          + "memory manager is 
created. Try moving "
+                                                          + "any actions about 
memory allocation"
+                                                          + " out from the 
memory manager"
+                                                          + " constructor.");
+                                                  return 0L;
+                                                });
+                                      }
 
-                                  @Override
-                                  public Set<Phase> applicablePhases() {
-                                    return Spillers.PHASE_SET_SHRINK_ONLY;
-                                  }
-                                }),
-                            spillers.stream())
-                        .map(spiller -> Spillers.withMinSpillSize(spiller, 
reservationBlockSize))
-                        .collect(Collectors.toList()),
-                    Collections.singletonMap(
-                        "single",
-                        new MemoryUsageRecorder() {
-                          @Override
-                          public void inc(long bytes) {
-                            // no-op
-                          }
+                                      @Override
+                                      public Set<Phase> applicablePhases() {
+                                        return Spillers.PHASE_SET_SHRINK_ONLY;
+                                      }
+                                    }),
+                                spillers.stream())
+                            .map(
+                                spiller -> Spillers.withMinSpillSize(spiller, 
reservationBlockSize))
+                            .collect(Collectors.toList()),
+                        Collections.singletonMap(
+                            "single",
+                            new MemoryUsageRecorder() {
+                              @Override
+                              public void inc(long bytes) {
+                                // no-op
+                              }
 
-                          @Override
-                          public long peak() {
-                            throw new UnsupportedOperationException("Not 
implemented");
-                          }
+                              @Override
+                              public long peak() {
+                                throw new UnsupportedOperationException("Not 
implemented");
+                              }
 
-                          @Override
-                          public long current() {
-                            throw new UnsupportedOperationException("Not 
implemented");
-                          }
+                              @Override
+                              public long current() {
+                                throw new UnsupportedOperationException("Not 
implemented");
+                              }
 
-                          @Override
-                          public MemoryUsageStats toStats() {
-                            return 
getNativeMemoryManager().collectMemoryUsage();
-                          }
+                              @Override
+                              public MemoryUsageStats toStats() {
+                                return 
getNativeMemoryManager().collectMemoryUsage();
+                              }
 
-                          private NativeMemoryManager getNativeMemoryManager() 
{
-                            return Optional.of(out.get())
-                                .orElseThrow(
-                                    () ->
-                                        new IllegalStateException(
-                                            ""
-                                                + "Memory usage stats are 
requested before native "
-                                                + "memory manager is created. 
Try moving any "
-                                                + "actions about memory 
allocation out from the "
-                                                + "memory manager 
constructor."));
-                          }
-                        })),
-                MemoryTargets.newConsumer(
-                    tmm,
-                    "OverAcquire.DummyTarget",
-                    Collections.singletonList(
-                        new Spiller() {
-                          @Override
-                          public long spill(MemoryTarget self, long size) {
-                            return self.repay(size);
-                          }
+                              private NativeMemoryManager 
getNativeMemoryManager() {
+                                return Optional.ofNullable(out.get())
+                                    .orElseThrow(
+                                        () ->
+                                            new IllegalStateException(
+                                                ""
+                                                    + "Memory usage stats are 
requested before"
+                                                    + " native memory manager 
is created. Try"
+                                                    + " moving any actions 
about memory"
+                                                    + " allocation out from 
the memory manager"
+                                                    + " constructor."));
+                              }
+                            }))),
+                MemoryTargets.dynamicOffHeapSizingIfEnabled(
+                    MemoryTargets.newConsumer(
+                        tmm,
+                        "OverAcquire.DummyTarget",
+                        Collections.singletonList(
+                            new Spiller() {
+                              @Override
+                              public long spill(MemoryTarget self, long size) {
+                                return self.repay(size);
+                              }
 
-                          @Override
-                          public Set<Phase> applicablePhases() {
-                            return Spillers.PHASE_SET_ALL;
-                          }
-                        }),
-                    Collections.emptyMap()),
+                              @Override
+                              public Set<Phase> applicablePhases() {
+                                return Spillers.PHASE_SET_ALL;
+                              }
+                            }),
+                        Collections.emptyMap())),
                 overAcquiredRatio));
     // listener
     ManagedReservationListener rl =
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index e54724a59..2ac048b2b 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -71,7 +71,7 @@ public class NativePlanEvaluator {
               @Override
               public long spill(MemoryTarget self, long size) {
                 ColumnarBatchOutIterator instance =
-                    Optional.of(outIterator.get())
+                    Optional.ofNullable(outIterator.get())
                         .orElseThrow(
                             () ->
                                 new IllegalStateException(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to