This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 07c09b586 [CH] add throttler to GlutenHDFSDisk (#6046)
07c09b586 is described below

commit 07c09b5868421b4195f3e666aa7950f0d2312213
Author: LiuNeng <[email protected]>
AuthorDate: Wed Jun 12 10:17:36 2024 +0800

    [CH] add throttler to GlutenHDFSDisk (#6046)
    
    [CH] add throttler to GlutenHDFSDisk
    
    Co-authored-by: liuneng1994 <[email protected]>
---
 cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp | 13 ++++++++++++-
 cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h   |  8 ++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index bff4108f2..cdbe6c728 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -17,6 +17,8 @@
 
 #include "GlutenDiskHDFS.h"
 #include <ranges>
+
+#include <Common/Throttler.h>
 #include <Parser/SerializedPlanParser.h>
 #if USE_HDFS
 
@@ -70,6 +72,15 @@ DiskObjectStoragePtr 
GlutenDiskHDFS::createDiskObjectStorage()
         config_prefix);
 }
 
-
+std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
+    const String & path,
+    size_t buf_size,
+    DB::WriteMode mode,
+    const DB::WriteSettings & settings)
+{
+    if (throttler)
+        throttler->add(1);
+    return DiskObjectStorage::writeFile(path, buf_size, mode, settings);
+}
 }
 #endif
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 9caedaae8..4e375b283 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -19,6 +19,7 @@
 
 #include <config.h>
 
+#include <Common/Throttler.h>
 #include <Disks/ObjectStorages/DiskObjectStorage.h>
 #if USE_HDFS
 #include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
@@ -43,6 +44,8 @@ public:
         object_key_prefix = object_key_prefix_;
         hdfs_object_storage = 
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
         hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
+        auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
+        throttler = std::make_shared<DB::Throttler>(max_speed);
     }
 
     void createDirectory(const String & path) override;
@@ -52,11 +55,16 @@ public:
     void removeDirectory(const String & path) override;
 
     DB::DiskObjectStoragePtr createDiskObjectStorage() override;
+
+    std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path, 
size_t buf_size, DB::WriteMode mode,
+        const DB::WriteSettings& settings) override;
+
 private:
     String path2AbsPath(const String & path);
 
     GlutenHDFSObjectStorage * hdfs_object_storage;
     String object_key_prefix;
+    DB::ThrottlerPtr throttler;
 };
 #endif
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to