This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d7374bd8f [GLUTEN-5314][VL] Separate FileSink instantiation for
different file systems (#5881)
d7374bd8f is described below
commit d7374bd8f5a2ccc518d095ebd675e2ca3269ec47
Author: PHILO-HE <[email protected]>
AuthorDate: Tue May 28 20:00:21 2024 +0800
[GLUTEN-5314][VL] Separate FileSink instantiation for different file
systems (#5881)
---
cpp/velox/operators/writer/VeloxParquetDatasource.cc | 5 ++++-
cpp/velox/operators/writer/VeloxParquetDatasource.h | 1 +
cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h | 4 ++--
cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h | 4 ++--
cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h | 4 ++--
cpp/velox/operators/writer/VeloxParquetDatasourceS3.h | 4 ++--
6 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
index 2677b0a81..16558229e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
@@ -43,13 +43,16 @@ namespace {
const int32_t kGzipWindowBits4k = 12;
}
-void VeloxParquetDatasource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
+void VeloxParquetDatasource::initSink(const std::unordered_map<std::string,
std::string>& /* sparkConfs */) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
} else {
throw std::runtime_error("The file path is not local when writing data
with parquet format in velox runtime!");
}
+}
+void VeloxParquetDatasource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
+ initSink(sparkConfs);
ArrowSchema cSchema{};
arrow::Status status = arrow::ExportSchema(*(schema_.get()), &cSchema);
if (!status.ok()) {
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h
b/cpp/velox/operators/writer/VeloxParquetDatasource.h
index 3df444016..12cf2c301 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h
@@ -89,6 +89,7 @@ class VeloxParquetDatasource : public Datasource {
: Datasource(filePath, schema), filePath_(filePath), schema_(schema),
pool_(std::move(veloxPool)) {}
void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override;
+ virtual void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs);
void inspectSchema(struct ArrowSchema* out) override;
void write(const std::shared_ptr<ColumnarBatch>& cb) override;
void close() override;
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
index 208e6a7ec..82e8f794c 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
@@ -41,13 +41,13 @@ class VeloxParquetDatasourceABFS final : public
VeloxParquetDatasource {
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
- void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+
+ void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto fileSystem = filesystems::getFileSystem(filePath_, hiveConf);
auto* abfsFileSystem =
dynamic_cast<filesystems::abfs::AbfsFileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
abfsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}),
filePath_);
- VeloxParquetDatasource::init(sparkConfs);
}
};
} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
index b8a9b5431..0c2bfa213 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
@@ -40,12 +40,12 @@ class VeloxParquetDatasourceGCS final : public
VeloxParquetDatasource {
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
- void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+
+ void initSink(const std::unordered_map<std::string, std::string>& /*
sparkConfs */) override {
auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
auto* gcsFileSystem =
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
gcsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}),
filePath_);
- VeloxParquetDatasource::init(sparkConfs);
}
};
} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
index 32cf960cb..7722c8e51 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
@@ -41,10 +41,10 @@ class VeloxParquetDatasourceHDFS final : public
VeloxParquetDatasource {
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
- void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+
+ void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties =
hiveConf, .pool = sinkPool_.get()});
- VeloxParquetDatasource::init(sparkConfs);
}
};
} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
index a5c49fcd9..3231a8a1e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
@@ -41,10 +41,10 @@ class VeloxParquetDatasourceS3 final : public
VeloxParquetDatasource {
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
- void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+
+ void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties =
hiveConf, .pool = sinkPool_.get()});
- VeloxParquetDatasource::init(sparkConfs);
}
};
} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]