This is an automated email from the ASF dual-hosted git repository.

weitingchen pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new c9f3d894f [VL] Branch-1.2: port PR #6503 PR #6515 PR #6554 (#6595)
c9f3d894f is described below

commit c9f3d894fe9a83aed48b9687869027787f665f9a
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Fri Jul 26 07:37:23 2024 +0800

    [VL] Branch-1.2: port PR #6503 PR #6515 PR #6554 (#6595)
    
    * [GLUTEN-6501][VL] Fix the missing fileReadProperties when constructing a 
LocalFilesNode (#6503)
    
    * [GLUTEN-6477][VL] Fix occasional dead lock during spilling (#6515)
    
    * [VL] Add thread_safe to several VeloxRuntime classes (#6526)
    
    VeloxRuntime is shared by many threads, like task threads or parquet 
writter threads. We must make sure the objects hold by VeloxRuntime are thread 
safe.
    
    * [VL] Following #6526, minor fixes and improvements (#6554)
    
    ---------
    
    Co-authored-by: zhaokuo <[email protected]>
    Co-authored-by: Hongze Zhang <[email protected]>
    Co-authored-by: BInwei Yang <[email protected]>
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  7 +--
 .../backendsapi/velox/VeloxIteratorApi.scala       |  7 ++-
 cpp/core/jni/JniCommon.h                           | 29 +++++++++----
 cpp/core/memory/AllocationListener.h               | 50 ++++++++++++++--------
 cpp/core/memory/MemoryAllocator.cc                 | 11 ++++-
 cpp/core/memory/MemoryAllocator.h                  |  9 ++--
 cpp/velox/compute/WholeStageResultIterator.cc      | 34 ++++++---------
 cpp/velox/memory/VeloxMemoryManager.h              |  1 +
 .../gluten/substrait/rel/LocalFilesBuilder.java    |  6 ++-
 .../gluten/substrait/rel/LocalFilesNode.java       |  8 ++--
 .../apache/gluten/backendsapi/IteratorApi.scala    |  3 +-
 .../execution/BasicScanExecTransformer.scala       |  7 ++-
 .../substrait/rel/IcebergLocalFilesNode.java       |  4 +-
 13 files changed, 107 insertions(+), 69 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4b9ec7390..ac9c633bc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       partition: InputPartition,
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
-      metadataColumnNames: Seq[String]): SplitInfo = {
+      metadataColumnNames: Seq[String],
+      properties: Map[String, String]): SplitInfo = {
     partition match {
       case p: GlutenMergeTreePartition =>
         val partLists = new JArrayList[String]()
@@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
           partitionColumns,
           new JArrayList[JMap[String, String]](),
           fileFormat,
-          preferredLocations.toList.asJava
+          preferredLocations.toList.asJava,
+          mapAsJavaMap(properties)
         )
       case _ =>
         throw new UnsupportedOperationException(s"Unsupported input partition: 
$partition.")
@@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
             split match {
               case filesNode: LocalFilesNode =>
                 setFileSchemaForLocalFiles(filesNode, scans(i))
-                
filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties))
                 filesNode.getPaths.forEach(f => files += f)
                 filesNode.toProtobuf.toByteArray
               case extensionTableNode: ExtensionTableNode =>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 613e53945..cc3cabb7c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       partition: InputPartition,
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
-      metadataColumnNames: Seq[String]): SplitInfo = {
+      metadataColumnNames: Seq[String],
+      properties: Map[String, String]): SplitInfo = {
     partition match {
       case f: FilePartition =>
         val (
@@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
           partitionColumns,
           metadataColumns,
           fileFormat,
-          preferredLocations.toList.asJava)
+          preferredLocations.toList.asJava,
+          mapAsJavaMap(properties)
+        )
       case _ =>
         throw new UnsupportedOperationException(s"Unsupported input 
partition.")
     }
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index d5c9f2b3b..1d784f3a5 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -362,6 +362,10 @@ static inline gluten::CompressionMode 
getCompressionMode(JNIEnv* env, jstring co
   }
 }
 
+/*
+NOTE: the class must be thread safe
+ */
+
 class SparkAllocationListener final : public gluten::AllocationListener {
  public:
   SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID 
jReserveMethod, jmethodID jUnreserveMethod)
@@ -399,25 +403,34 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
       env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
       checkException(env);
     }
-    bytesReserved_ += size;
-    maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
+    usedBytes_ += size;
+    while (true) {
+      int64_t savedPeakBytes = peakBytes_;
+      if (usedBytes_ <= savedPeakBytes) {
+        break;
+      }
+      // usedBytes_ > savedPeakBytes, update peak
+      if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+        break;
+      }
+    }
   }
 
   int64_t currentBytes() override {
-    return bytesReserved_;
+    return usedBytes_;
   }
 
   int64_t peakBytes() override {
-    return maxBytesReserved_;
+    return peakBytes_;
   }
 
  private:
   JavaVM* vm_;
   jobject jListenerGlobalRef_;
-  jmethodID jReserveMethod_;
-  jmethodID jUnreserveMethod_;
-  int64_t bytesReserved_ = 0L;
-  int64_t maxBytesReserved_ = 0L;
+  const jmethodID jReserveMethod_;
+  const jmethodID jUnreserveMethod_;
+  std::atomic_int64_t usedBytes_{0L};
+  std::atomic_int64_t peakBytes_{0L};
 };
 
 class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/memory/AllocationListener.h 
b/cpp/core/memory/AllocationListener.h
index d43a621de..41797641f 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <memory>
+#include <mutex>
 
 namespace gluten {
 
@@ -46,29 +47,21 @@ class AllocationListener {
 };
 
 /// Memory changes will be round to specified block size which aim to decrease 
delegated listener calls.
+// The class must be thread safe
 class BlockAllocationListener final : public AllocationListener {
  public:
-  BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
+  BlockAllocationListener(AllocationListener* delegated, int64_t blockSize)
       : delegated_(delegated), blockSize_(blockSize) {}
 
   void allocationChanged(int64_t diff) override {
     if (diff == 0) {
       return;
     }
-    if (diff > 0) {
-      if (reservationBytes_ - usedBytes_ < diff) {
-        auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
-        delegated_->allocationChanged(roundSize);
-        reservationBytes_ += roundSize;
-        peakBytes_ = std::max(peakBytes_, reservationBytes_);
-      }
-      usedBytes_ += diff;
-    } else {
-      usedBytes_ += diff;
-      auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * 
blockSize_;
-      delegated_->allocationChanged(-unreservedSize);
-      reservationBytes_ -= unreservedSize;
+    int64_t granted = reserve(diff);
+    if (granted == 0) {
+      return;
     }
+    delegated_->allocationChanged(granted);
   }
 
   int64_t currentBytes() override {
@@ -80,11 +73,30 @@ class BlockAllocationListener final : public 
AllocationListener {
   }
 
  private:
-  AllocationListener* delegated_;
-  uint64_t blockSize_{0L};
-  uint64_t usedBytes_{0L};
-  uint64_t peakBytes_{0L};
-  uint64_t reservationBytes_{0L};
+  inline int64_t reserve(int64_t diff) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    usedBytes_ += diff;
+    int64_t newBlockCount;
+    if (usedBytes_ == 0) {
+      newBlockCount = 0;
+    } else {
+      // ceil to get the required block number
+      newBlockCount = (usedBytes_ - 1) / blockSize_ + 1;
+    }
+    int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
+    blocksReserved_ = newBlockCount;
+    peakBytes_ = std::max(peakBytes_, usedBytes_);
+    return bytesGranted;
+  }
+
+  AllocationListener* const delegated_;
+  const uint64_t blockSize_;
+  int64_t blocksReserved_{0L};
+  int64_t usedBytes_{0L};
+  int64_t peakBytes_{0L};
+  int64_t reservationBytes_{0L};
+
+  mutable std::mutex mutex_;
 };
 
 } // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc 
b/cpp/core/memory/MemoryAllocator.cc
index ac869219d..c637c6a9c 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -93,7 +93,16 @@ int64_t ListenableMemoryAllocator::peakBytes() const {
 void ListenableMemoryAllocator::updateUsage(int64_t size) {
   listener_->allocationChanged(size);
   usedBytes_ += size;
-  peakBytes_ = std::max(peakBytes_, usedBytes_);
+  while (true) {
+    int64_t savedPeakBytes = peakBytes_;
+    if (usedBytes_ <= savedPeakBytes) {
+      break;
+    }
+    // usedBytes_ > savedPeakBytes, update peak
+    if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+      break;
+    }
+  }
 }
 
 bool StdMemoryAllocator::allocate(int64_t size, void** out) {
diff --git a/cpp/core/memory/MemoryAllocator.h 
b/cpp/core/memory/MemoryAllocator.h
index bc8f9de18..01271cc94 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -45,6 +45,7 @@ class MemoryAllocator {
   virtual int64_t peakBytes() const = 0;
 };
 
+// The class must be thread safe
 class ListenableMemoryAllocator final : public MemoryAllocator {
  public:
   explicit ListenableMemoryAllocator(MemoryAllocator* delegated, 
AllocationListener* listener)
@@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public 
MemoryAllocator {
 
  private:
   void updateUsage(int64_t size);
-  MemoryAllocator* delegated_;
-  AllocationListener* listener_;
-  uint64_t usedBytes_{0L};
-  uint64_t peakBytes_{0L};
+  MemoryAllocator* const delegated_;
+  AllocationListener* const listener_;
+  std::atomic_int64_t usedBytes_{0L};
+  std::atomic_int64_t peakBytes_{0L};
 };
 
 class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 8439545ca..c417202b9 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch> 
WholeStageResultIterator::next() {
 }
 
 namespace {
-class ConditionalSuspendedSection {
+class SuspendedSection {
  public:
-  ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
-    if (condition) {
-      section_ = new velox::exec::SuspendedSection(driver);
-    }
+  SuspendedSection() {
+    reclaimer_->enterArbitration();
   }
 
-  virtual ~ConditionalSuspendedSection() {
-    if (section_) {
-      delete section_;
-    }
+  virtual ~SuspendedSection() {
+    reclaimer_->leaveArbitration();
   }
 
   // singleton
-  ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
-  ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
-  ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = 
delete;
-  ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = 
delete;
+  SuspendedSection(const SuspendedSection&) = delete;
+  SuspendedSection(SuspendedSection&&) = delete;
+  SuspendedSection& operator=(const SuspendedSection&) = delete;
+  SuspendedSection& operator=(SuspendedSection&&) = delete;
 
  private:
-  velox::exec::SuspendedSection* section_ = nullptr;
+  // We only use suspension APIs in exec::MemoryReclaimer.
+  std::unique_ptr<velox::memory::MemoryReclaimer> 
reclaimer_{velox::exec::MemoryReclaimer::create()};
 };
 } // namespace
 
@@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t 
size) {
   if (spillStrategy_ == "auto") {
     int64_t remaining = size - shrunken;
     LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining 
<< " bytes...";
-    // if we are on one of the driver of the spilled task, suspend it
-    velox::exec::Driver* thisDriver = nullptr;
-    task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
-      if (driver->isOnThread()) {
-        thisDriver = driver;
-      }
-    });
     // suspend the driver when we are on it
-    ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
+    SuspendedSection suspender;
     velox::exec::MemoryReclaimer::Stats status;
     auto* mm = memoryManager_->getMemoryManager();
     uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); 
// this conducts spilling
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 3607ca793..7a96b87e1 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -25,6 +25,7 @@
 
 namespace gluten {
 
+// Make sure the class is thread safe
 class VeloxMemoryManager final : public MemoryManager {
  public:
   VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
index 7e085f81f..a58f5e043 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
@@ -32,7 +32,8 @@ public class LocalFilesBuilder {
       List<Map<String, String>> partitionColumns,
       List<Map<String, String>> metadataColumns,
       LocalFilesNode.ReadFileFormat fileFormat,
-      List<String> preferredLocations) {
+      List<String> preferredLocations,
+      Map<String, String> properties) {
     return new LocalFilesNode(
         index,
         paths,
@@ -43,7 +44,8 @@ public class LocalFilesBuilder {
         partitionColumns,
         metadataColumns,
         fileFormat,
-        preferredLocations);
+        preferredLocations,
+        properties);
   }
 
   public static LocalFilesNode makeLocalFiles(String iterPath) {
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index fa9f3d516..172a6e8cc 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -67,7 +67,8 @@ public class LocalFilesNode implements SplitInfo {
       List<Map<String, String>> partitionColumns,
       List<Map<String, String>> metadataColumns,
       ReadFileFormat fileFormat,
-      List<String> preferredLocations) {
+      List<String> preferredLocations,
+      Map<String, String> properties) {
     this.index = index;
     this.paths.addAll(paths);
     this.starts.addAll(starts);
@@ -78,6 +79,7 @@ public class LocalFilesNode implements SplitInfo {
     this.partitionColumns.addAll(partitionColumns);
     this.metadataColumns.addAll(metadataColumns);
     this.preferredLocations.addAll(preferredLocations);
+    this.fileReadProperties = properties;
   }
 
   LocalFilesNode(String iterPath) {
@@ -109,10 +111,6 @@ public class LocalFilesNode implements SplitInfo {
     return namedStructBuilder.build();
   }
 
-  public void setFileReadProperties(Map<String, String> fileReadProperties) {
-    this.fileReadProperties = fileReadProperties;
-  }
-
   @Override
   public List<String> preferredLocations() {
     return this.preferredLocations;
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 53dc8f478..ee89d3d46 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -36,7 +36,8 @@ trait IteratorApi {
       partition: InputPartition,
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
-      metadataColumnNames: Seq[String]): SplitInfo
+      metadataColumnNames: Seq[String],
+      properties: Map[String, String]): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 64071fb14..d7ffe237e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
     partitions.map(
       BackendsApiManager.getIteratorApiInstance
-        .genSplitInfo(_, getPartitionSchema, fileFormat, 
getMetadataColumns.map(_.name)))
+        .genSplitInfo(
+          _,
+          getPartitionSchema,
+          fileFormat,
+          getMetadataColumns.map(_.name),
+          getProperties))
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
 
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
index ba6b0ac4a..398bdbdb5 100644
--- 
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
+++ 
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
@@ -22,6 +22,7 @@ import io.substrait.proto.ReadRel;
 import org.apache.iceberg.DeleteFile;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
         partitionColumns,
         new ArrayList<>(),
         fileFormat,
-        preferredLocations);
+        preferredLocations,
+        new HashMap<>());
     this.deleteFilesList = deleteFilesList;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to