This is an automated email from the ASF dual-hosted git repository.
weitingchen pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new c9f3d894f [VL] Branch-1.2: port PR #6503 PR #6515 PR #6554 (#6595)
c9f3d894f is described below
commit c9f3d894fe9a83aed48b9687869027787f665f9a
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Fri Jul 26 07:37:23 2024 +0800
[VL] Branch-1.2: port PR #6503 PR #6515 PR #6554 (#6595)
* [GLUTEN-6501][VL] Fix the missing fileReadProperties when constructing a
LocalFilesNode (#6503)
* [GLUTEN-6477][VL] Fix occasional dead lock during spilling (#6515)
* [VL] Add thread_safe to several VeloxRuntime classes (#6526)
VeloxRuntime is shared by many threads, like task threads or parquet
writter threads. We must make sure the objects hold by VeloxRuntime are thread
safe.
* [VL] Following #6526, minor fixes and improvements (#6554)
---------
Co-authored-by: zhaokuo <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
Co-authored-by: BInwei Yang <[email protected]>
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 7 +--
.../backendsapi/velox/VeloxIteratorApi.scala | 7 ++-
cpp/core/jni/JniCommon.h | 29 +++++++++----
cpp/core/memory/AllocationListener.h | 50 ++++++++++++++--------
cpp/core/memory/MemoryAllocator.cc | 11 ++++-
cpp/core/memory/MemoryAllocator.h | 9 ++--
cpp/velox/compute/WholeStageResultIterator.cc | 34 ++++++---------
cpp/velox/memory/VeloxMemoryManager.h | 1 +
.../gluten/substrait/rel/LocalFilesBuilder.java | 6 ++-
.../gluten/substrait/rel/LocalFilesNode.java | 8 ++--
.../apache/gluten/backendsapi/IteratorApi.scala | 3 +-
.../execution/BasicScanExecTransformer.scala | 7 ++-
.../substrait/rel/IcebergLocalFilesNode.java | 4 +-
13 files changed, 107 insertions(+), 69 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4b9ec7390..ac9c633bc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
- metadataColumnNames: Seq[String]): SplitInfo = {
+ metadataColumnNames: Seq[String],
+ properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
val partLists = new JArrayList[String]()
@@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
- preferredLocations.toList.asJava
+ preferredLocations.toList.asJava,
+ mapAsJavaMap(properties)
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition:
$partition.")
@@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
-
filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties))
filesNode.getPaths.forEach(f => files += f)
filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 613e53945..cc3cabb7c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
- metadataColumnNames: Seq[String]): SplitInfo = {
+ metadataColumnNames: Seq[String],
+ properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
@@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionColumns,
metadataColumns,
fileFormat,
- preferredLocations.toList.asJava)
+ preferredLocations.toList.asJava,
+ mapAsJavaMap(properties)
+ )
case _ =>
throw new UnsupportedOperationException(s"Unsupported input
partition.")
}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index d5c9f2b3b..1d784f3a5 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -362,6 +362,10 @@ static inline gluten::CompressionMode
getCompressionMode(JNIEnv* env, jstring co
}
}
+/*
+NOTE: the class must be thread safe
+ */
+
class SparkAllocationListener final : public gluten::AllocationListener {
public:
SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID
jReserveMethod, jmethodID jUnreserveMethod)
@@ -399,25 +403,34 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
- bytesReserved_ += size;
- maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
+ usedBytes_ += size;
+ while (true) {
+ int64_t savedPeakBytes = peakBytes_;
+ if (usedBytes_ <= savedPeakBytes) {
+ break;
+ }
+ // usedBytes_ > savedPeakBytes, update peak
+ if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+ break;
+ }
+ }
}
int64_t currentBytes() override {
- return bytesReserved_;
+ return usedBytes_;
}
int64_t peakBytes() override {
- return maxBytesReserved_;
+ return peakBytes_;
}
private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
- jmethodID jReserveMethod_;
- jmethodID jUnreserveMethod_;
- int64_t bytesReserved_ = 0L;
- int64_t maxBytesReserved_ = 0L;
+ const jmethodID jReserveMethod_;
+ const jmethodID jUnreserveMethod_;
+ std::atomic_int64_t usedBytes_{0L};
+ std::atomic_int64_t peakBytes_{0L};
};
class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/memory/AllocationListener.h
b/cpp/core/memory/AllocationListener.h
index d43a621de..41797641f 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -19,6 +19,7 @@
#include <algorithm>
#include <memory>
+#include <mutex>
namespace gluten {
@@ -46,29 +47,21 @@ class AllocationListener {
};
/// Memory changes will be round to specified block size which aim to decrease
delegated listener calls.
+// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
- BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
+ BlockAllocationListener(AllocationListener* delegated, int64_t blockSize)
: delegated_(delegated), blockSize_(blockSize) {}
void allocationChanged(int64_t diff) override {
if (diff == 0) {
return;
}
- if (diff > 0) {
- if (reservationBytes_ - usedBytes_ < diff) {
- auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
- delegated_->allocationChanged(roundSize);
- reservationBytes_ += roundSize;
- peakBytes_ = std::max(peakBytes_, reservationBytes_);
- }
- usedBytes_ += diff;
- } else {
- usedBytes_ += diff;
- auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ *
blockSize_;
- delegated_->allocationChanged(-unreservedSize);
- reservationBytes_ -= unreservedSize;
+ int64_t granted = reserve(diff);
+ if (granted == 0) {
+ return;
}
+ delegated_->allocationChanged(granted);
}
int64_t currentBytes() override {
@@ -80,11 +73,30 @@ class BlockAllocationListener final : public
AllocationListener {
}
private:
- AllocationListener* delegated_;
- uint64_t blockSize_{0L};
- uint64_t usedBytes_{0L};
- uint64_t peakBytes_{0L};
- uint64_t reservationBytes_{0L};
+ inline int64_t reserve(int64_t diff) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ usedBytes_ += diff;
+ int64_t newBlockCount;
+ if (usedBytes_ == 0) {
+ newBlockCount = 0;
+ } else {
+ // ceil to get the required block number
+ newBlockCount = (usedBytes_ - 1) / blockSize_ + 1;
+ }
+ int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
+ blocksReserved_ = newBlockCount;
+ peakBytes_ = std::max(peakBytes_, usedBytes_);
+ return bytesGranted;
+ }
+
+ AllocationListener* const delegated_;
+ const uint64_t blockSize_;
+ int64_t blocksReserved_{0L};
+ int64_t usedBytes_{0L};
+ int64_t peakBytes_{0L};
+ int64_t reservationBytes_{0L};
+
+ mutable std::mutex mutex_;
};
} // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc
b/cpp/core/memory/MemoryAllocator.cc
index ac869219d..c637c6a9c 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -93,7 +93,16 @@ int64_t ListenableMemoryAllocator::peakBytes() const {
void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
usedBytes_ += size;
- peakBytes_ = std::max(peakBytes_, usedBytes_);
+ while (true) {
+ int64_t savedPeakBytes = peakBytes_;
+ if (usedBytes_ <= savedPeakBytes) {
+ break;
+ }
+ // usedBytes_ > savedPeakBytes, update peak
+ if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+ break;
+ }
+ }
}
bool StdMemoryAllocator::allocate(int64_t size, void** out) {
diff --git a/cpp/core/memory/MemoryAllocator.h
b/cpp/core/memory/MemoryAllocator.h
index bc8f9de18..01271cc94 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -45,6 +45,7 @@ class MemoryAllocator {
virtual int64_t peakBytes() const = 0;
};
+// The class must be thread safe
class ListenableMemoryAllocator final : public MemoryAllocator {
public:
explicit ListenableMemoryAllocator(MemoryAllocator* delegated,
AllocationListener* listener)
@@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public
MemoryAllocator {
private:
void updateUsage(int64_t size);
- MemoryAllocator* delegated_;
- AllocationListener* listener_;
- uint64_t usedBytes_{0L};
- uint64_t peakBytes_{0L};
+ MemoryAllocator* const delegated_;
+ AllocationListener* const listener_;
+ std::atomic_int64_t usedBytes_{0L};
+ std::atomic_int64_t peakBytes_{0L};
};
class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 8439545ca..c417202b9 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch>
WholeStageResultIterator::next() {
}
namespace {
-class ConditionalSuspendedSection {
+class SuspendedSection {
public:
- ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
- if (condition) {
- section_ = new velox::exec::SuspendedSection(driver);
- }
+ SuspendedSection() {
+ reclaimer_->enterArbitration();
}
- virtual ~ConditionalSuspendedSection() {
- if (section_) {
- delete section_;
- }
+ virtual ~SuspendedSection() {
+ reclaimer_->leaveArbitration();
}
// singleton
- ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
- ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
- ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) =
delete;
- ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) =
delete;
+ SuspendedSection(const SuspendedSection&) = delete;
+ SuspendedSection(SuspendedSection&&) = delete;
+ SuspendedSection& operator=(const SuspendedSection&) = delete;
+ SuspendedSection& operator=(SuspendedSection&&) = delete;
private:
- velox::exec::SuspendedSection* section_ = nullptr;
+ // We only use suspension APIs in exec::MemoryReclaimer.
+ std::unique_ptr<velox::memory::MemoryReclaimer>
reclaimer_{velox::exec::MemoryReclaimer::create()};
};
} // namespace
@@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t
size) {
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining
<< " bytes...";
- // if we are on one of the driver of the spilled task, suspend it
- velox::exec::Driver* thisDriver = nullptr;
- task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
- if (driver->isOnThread()) {
- thisDriver = driver;
- }
- });
// suspend the driver when we are on it
- ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
+ SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining);
// this conducts spilling
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 3607ca793..7a96b87e1 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -25,6 +25,7 @@
namespace gluten {
+// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
index 7e085f81f..a58f5e043 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
@@ -32,7 +32,8 @@ public class LocalFilesBuilder {
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
- List<String> preferredLocations) {
+ List<String> preferredLocations,
+ Map<String, String> properties) {
return new LocalFilesNode(
index,
paths,
@@ -43,7 +44,8 @@ public class LocalFilesBuilder {
partitionColumns,
metadataColumns,
fileFormat,
- preferredLocations);
+ preferredLocations,
+ properties);
}
public static LocalFilesNode makeLocalFiles(String iterPath) {
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index fa9f3d516..172a6e8cc 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -67,7 +67,8 @@ public class LocalFilesNode implements SplitInfo {
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
- List<String> preferredLocations) {
+ List<String> preferredLocations,
+ Map<String, String> properties) {
this.index = index;
this.paths.addAll(paths);
this.starts.addAll(starts);
@@ -78,6 +79,7 @@ public class LocalFilesNode implements SplitInfo {
this.partitionColumns.addAll(partitionColumns);
this.metadataColumns.addAll(metadataColumns);
this.preferredLocations.addAll(preferredLocations);
+ this.fileReadProperties = properties;
}
LocalFilesNode(String iterPath) {
@@ -109,10 +111,6 @@ public class LocalFilesNode implements SplitInfo {
return namedStructBuilder.build();
}
- public void setFileReadProperties(Map<String, String> fileReadProperties) {
- this.fileReadProperties = fileReadProperties;
- }
-
@Override
public List<String> preferredLocations() {
return this.preferredLocations;
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 53dc8f478..ee89d3d46 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -36,7 +36,8 @@ trait IteratorApi {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
- metadataColumnNames: Seq[String]): SplitInfo
+ metadataColumnNames: Seq[String],
+ properties: Map[String, String]): SplitInfo
/** Generate native row partition. */
def genPartitions(
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 64071fb14..d7ffe237e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
- .genSplitInfo(_, getPartitionSchema, fileFormat,
getMetadataColumns.map(_.name)))
+ .genSplitInfo(
+ _,
+ getPartitionSchema,
+ fileFormat,
+ getMetadataColumns.map(_.name),
+ getProperties))
}
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
index ba6b0ac4a..398bdbdb5 100644
---
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
@@ -22,6 +22,7 @@ import io.substrait.proto.ReadRel;
import org.apache.iceberg.DeleteFile;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
partitionColumns,
new ArrayList<>(),
fileFormat,
- preferredLocations);
+ preferredLocations,
+ new HashMap<>());
this.deleteFilesList = deleteFilesList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]