This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 485687235 [VL] Refactor VeloxParquetDatasource (#5486)
485687235 is described below
commit 485687235385d18e56bb02262dca480bb197a454
Author: 高阳阳 <[email protected]>
AuthorDate: Wed Apr 24 14:20:07 2024 +0800
[VL] Refactor VeloxParquetDatasource (#5486)
[VL] Refactor VeloxParquetDatasource.
---
cpp/velox/benchmarks/ParquetWriteBenchmark.cc | 3 +-
cpp/velox/compute/VeloxRuntime.cc | 40 ++++++++++++++--
.../operators/writer/VeloxParquetDatasource.cc | 33 +------------
.../operators/writer/VeloxParquetDatasource.h | 53 +++++++++++----------
.../operators/writer/VeloxParquetDatasourceGCS.h | 51 ++++++++++++++++++++
.../operators/writer/VeloxParquetDatasourceHDFS.h | 54 ++++++++++++++++++++++
.../operators/writer/VeloxParquetDatasourceS3.h | 54 ++++++++++++++++++++++
7 files changed, 225 insertions(+), 63 deletions(-)
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index d9b2cf77a..7e9959797 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -266,8 +266,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark :
public GoogleBenchmar
auto veloxParquetDatasource =
std::make_unique<gluten::VeloxParquetDatasource>(
outputPath_ + "/" + fileName,
veloxPool->addAggregateChild("writer_benchmark"),
- veloxPool->addLeafChild("s3_sink_pool"),
- veloxPool->addLeafChild("gcs_sink_pool"),
+ veloxPool->addLeafChild("sink_pool"),
localSchema);
veloxParquetDatasource->init(runtime->getConfMap());
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 46d9bc85f..cfac1dadd 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -33,6 +33,18 @@
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
+#ifdef ENABLE_HDFS
+#include "operators/writer/VeloxParquetDatasourceHDFS.h"
+#endif
+
+#ifdef ENABLE_S3
+#include "operators/writer/VeloxParquetDatasourceS3.h"
+#endif
+
+#ifdef ENABLE_GCS
+#include "operators/writer/VeloxParquetDatasourceGCS.h"
+#endif
+
using namespace facebook;
namespace gluten {
@@ -185,10 +197,30 @@ std::shared_ptr<Datasource>
VeloxRuntime::createDatasource(
auto veloxPool =
getAggregateVeloxPool(memoryManager)->addAggregateChild("datasource." +
std::to_string(id++));
// Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool
// with parquet writer.
- auto s3SinkPool = getLeafVeloxPool(memoryManager);
- auto gcsSinkPool = getLeafVeloxPool(memoryManager);
-
- return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool,
s3SinkPool, gcsSinkPool, schema);
+ auto sinkPool = getLeafVeloxPool(memoryManager);
+ if (isSupportedHDFSPath(filePath)) {
+#ifdef ENABLE_HDFS
+ return std::make_shared<VeloxParquetDatasourceHDFS>(filePath, veloxPool,
sinkPool, schema);
+#else
+ throw std::runtime_error(
+ "The write path is hdfs path but the HDFS haven't been enabled when
writing parquet data in velox runtime!");
+#endif
+ } else if (isSupportedS3SdkPath(filePath)) {
+#ifdef ENABLE_S3
+ return std::make_shared<VeloxParquetDatasourceS3>(filePath, veloxPool,
sinkPool, schema);
+#else
+ throw std::runtime_error(
+ "The write path is S3 path but the S3 haven't been enabled when
writing parquet data in velox runtime!");
+#endif
+ } else if (isSupportedGCSPath(filePath)) {
+#ifdef ENABLE_GCS
+ return std::make_shared<VeloxParquetDatasourceGCS>(filePath, veloxPool,
sinkPool, schema);
+#else
+ throw std::runtime_error(
+ "The write path is GCS path but the GCS haven't been enabled when
writing parquet data in velox runtime!");
+#endif
+ }
+ return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool,
sinkPool, schema);
}
std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
index c45b9cb14..2677b0a81 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
@@ -46,39 +46,8 @@ const int32_t kGzipWindowBits4k = 12;
void VeloxParquetDatasource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
- } else if (isSupportedS3SdkPath(filePath_)) {
-#ifdef ENABLE_S3
- auto confs =
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
- auto hiveConfs = getHiveConfig(confs);
- sink_ = dwio::common::FileSink::create(
- filePath_,
- {.connectorProperties =
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
- .pool = s3SinkPool_.get()});
-#else
- throw std::runtime_error(
- "The write path is S3 path but the S3 haven't been enabled when
writing parquet data in velox runtime!");
-#endif
- } else if (strncmp(filePath_.c_str(), "gs:", 3) == 0) {
-#ifdef ENABLE_GCS
- auto fileSystem = getFileSystem(filePath_, nullptr);
- auto* gcsFileSystem =
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
- sink_ = std::make_unique<dwio::common::WriteFileSink>(
- gcsFileSystem->openFileForWrite(filePath_, {{}, gcsSinkPool_.get()}),
filePath_);
-#else
- throw std::runtime_error(
- "The write path is GCS path but the GCS haven't been enabled when
writing parquet data in velox runtime!");
-#endif
- } else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) {
-#ifdef ENABLE_HDFS
- sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
-#else
- throw std::runtime_error(
- "The write path is hdfs path but the HDFS haven't been enabled when
writing parquet data in velox runtime!");
-#endif
-
} else {
- throw std::runtime_error(
- "The file path is not local or hdfs when writing data with parquet
format in velox runtime!");
+ throw std::runtime_error("The file path is not local when writing data
with parquet format in velox runtime!");
}
ArrowSchema cSchema{};
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h
b/cpp/velox/operators/writer/VeloxParquetDatasource.h
index 3daad8fe3..bf035c423 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h
@@ -51,20 +51,35 @@
namespace gluten {
-class VeloxParquetDatasource final : public Datasource {
+inline bool isSupportedS3SdkPath(const std::string& filePath) {
+ // support scheme
+ const std::array<const char*, 5> supported_schemes = {"s3:", "s3a:", "oss:",
"cos:", "cosn:"};
+
+ for (const char* scheme : supported_schemes) {
+ size_t scheme_length = std::strlen(scheme);
+ if (filePath.length() >= scheme_length && std::strncmp(filePath.c_str(),
scheme, scheme_length) == 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+inline bool isSupportedGCSPath(const std::string& filePath) {
+ return strncmp(filePath.c_str(), "gs:", 3) == 0;
+}
+
+inline bool isSupportedHDFSPath(const std::string& filePath) {
+ return strncmp(filePath.c_str(), "hdfs:", 5) == 0;
+}
+
+class VeloxParquetDatasource : public Datasource {
public:
VeloxParquetDatasource(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool,
- std::shared_ptr<facebook::velox::memory::MemoryPool> gcsSinkPool,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : Datasource(filePath, schema),
- filePath_(filePath),
- schema_(schema),
- pool_(std::move(veloxPool)),
- s3SinkPool_(std::move(s3SinkPool)),
- gcsSinkPool_(std::move(gcsSinkPool)) {}
+ : Datasource(filePath, schema), filePath_(filePath), schema_(schema),
pool_(std::move(veloxPool)) {}
void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override;
void inspectSchema(struct ArrowSchema* out) override;
@@ -74,31 +89,19 @@ class VeloxParquetDatasource final : public Datasource {
return schema_;
}
- bool isSupportedS3SdkPath(const std::string& filePath_) {
- // support scheme
- const std::array<const char*, 5> supported_schemes = {"s3:", "s3a:",
"oss:", "cos:", "cosn:"};
-
- for (const char* scheme : supported_schemes) {
- size_t scheme_length = std::strlen(scheme);
- if (filePath_.length() >= scheme_length &&
std::strncmp(filePath_.c_str(), scheme, scheme_length) == 0) {
- return true;
- }
- }
- return false;
- }
+ protected:
+ std::string filePath_;
+ std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool_;
+ std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
private:
int64_t maxRowGroupBytes_ = 134217728; // 128MB
int64_t maxRowGroupRows_ = 100000000; // 100M
- std::string filePath_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<const facebook::velox::Type> type_;
std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
- std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool_;
- std::shared_ptr<facebook::velox::memory::MemoryPool> gcsSinkPool_;
- std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
};
} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
new file mode 100644
index 000000000..b8a9b5431
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+class VeloxParquetDatasourceGCS final : public VeloxParquetDatasource {
+ public:
+ VeloxParquetDatasourceGCS(
+ const std::string& filePath,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+ std::shared_ptr<arrow::Schema> schema)
+ : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+ auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
+ auto* gcsFileSystem =
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
+ sink_ = std::make_unique<dwio::common::WriteFileSink>(
+ gcsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}),
filePath_);
+ VeloxParquetDatasource::init(sparkConfs);
+ }
+};
+} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
new file mode 100644
index 000000000..1b37a7c6f
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+
+class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
+ public:
+ VeloxParquetDatasourceHDFS(
+ const std::string& filePath,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+ std::shared_ptr<arrow::Schema> schema)
+ : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+ auto confs =
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
+ auto hiveConfs = getHiveConfig(confs);
+ sink_ = dwio::common::FileSink::create(
+ filePath_,
+ {.connectorProperties =
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
+ .pool = sinkPool_.get()});
+ VeloxParquetDatasource::init(sparkConfs);
+ }
+};
+} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
new file mode 100644
index 000000000..92965d4e3
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+
+class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
+ public:
+ VeloxParquetDatasourceS3(
+ const std::string& filePath,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+ std::shared_ptr<arrow::Schema> schema)
+ : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override {
+ auto confs =
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
+ auto hiveConfs = getHiveConfig(confs);
+ sink_ = dwio::common::FileSink::create(
+ filePath_,
+ {.connectorProperties =
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
+ .pool = sinkPool_.get()});
+ VeloxParquetDatasource::init(sparkConfs);
+ }
+};
+} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]