This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d7374bd8f [GLUTEN-5314][VL] Separate FileSink instantiation for 
different file systems (#5881)
d7374bd8f is described below

commit d7374bd8f5a2ccc518d095ebd675e2ca3269ec47
Author: PHILO-HE <[email protected]>
AuthorDate: Tue May 28 20:00:21 2024 +0800

    [GLUTEN-5314][VL] Separate FileSink instantiation for different file 
systems (#5881)
---
 cpp/velox/operators/writer/VeloxParquetDatasource.cc    | 5 ++++-
 cpp/velox/operators/writer/VeloxParquetDatasource.h     | 1 +
 cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h | 4 ++--
 cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h  | 4 ++--
 cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h | 4 ++--
 cpp/velox/operators/writer/VeloxParquetDatasourceS3.h   | 4 ++--
 6 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc 
b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
index 2677b0a81..16558229e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
@@ -43,13 +43,16 @@ namespace {
 const int32_t kGzipWindowBits4k = 12;
 }
 
-void VeloxParquetDatasource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
+void VeloxParquetDatasource::initSink(const std::unordered_map<std::string, 
std::string>& /* sparkConfs */) {
   if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
     sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
   } else {
     throw std::runtime_error("The file path is not local when writing data 
with parquet format in velox runtime!");
   }
+}
 
+void VeloxParquetDatasource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
+  initSink(sparkConfs);
   ArrowSchema cSchema{};
   arrow::Status status = arrow::ExportSchema(*(schema_.get()), &cSchema);
   if (!status.ok()) {
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h 
b/cpp/velox/operators/writer/VeloxParquetDatasource.h
index 3df444016..12cf2c301 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h
@@ -89,6 +89,7 @@ class VeloxParquetDatasource : public Datasource {
       : Datasource(filePath, schema), filePath_(filePath), schema_(schema), 
pool_(std::move(veloxPool)) {}
 
   void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override;
+  virtual void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs);
   void inspectSchema(struct ArrowSchema* out) override;
   void write(const std::shared_ptr<ColumnarBatch>& cb) override;
   void close() override;
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
index 208e6a7ec..82e8f794c 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
@@ -41,13 +41,13 @@ class VeloxParquetDatasourceABFS final : public 
VeloxParquetDatasource {
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
       : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
-  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+
+  void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
     auto fileSystem = filesystems::getFileSystem(filePath_, hiveConf);
     auto* abfsFileSystem = 
dynamic_cast<filesystems::abfs::AbfsFileSystem*>(fileSystem.get());
     sink_ = std::make_unique<dwio::common::WriteFileSink>(
         abfsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}), 
filePath_);
-    VeloxParquetDatasource::init(sparkConfs);
   }
 };
 } // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
index b8a9b5431..0c2bfa213 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
@@ -40,12 +40,12 @@ class VeloxParquetDatasourceGCS final : public 
VeloxParquetDatasource {
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
       : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
-  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+
+  void initSink(const std::unordered_map<std::string, std::string>& /* 
sparkConfs */) override {
     auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
     auto* gcsFileSystem = 
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
     sink_ = std::make_unique<dwio::common::WriteFileSink>(
         gcsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}), 
filePath_);
-    VeloxParquetDatasource::init(sparkConfs);
   }
 };
 } // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
index 32cf960cb..7722c8e51 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
@@ -41,10 +41,10 @@ class VeloxParquetDatasourceHDFS final : public 
VeloxParquetDatasource {
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
       : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
-  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+
+  void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
     sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = 
hiveConf, .pool = sinkPool_.get()});
-    VeloxParquetDatasource::init(sparkConfs);
   }
 };
 } // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
index a5c49fcd9..3231a8a1e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
@@ -41,10 +41,10 @@ class VeloxParquetDatasourceS3 final : public 
VeloxParquetDatasource {
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
       : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
-  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+
+  void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
     sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = 
hiveConf, .pool = sinkPool_.get()});
-    VeloxParquetDatasource::init(sparkConfs);
   }
 };
 } // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to