This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 485687235 [VL] Refactor VeloxParquetDatasource  (#5486)
485687235 is described below

commit 485687235385d18e56bb02262dca480bb197a454
Author: 高阳阳 <[email protected]>
AuthorDate: Wed Apr 24 14:20:07 2024 +0800

    [VL] Refactor VeloxParquetDatasource  (#5486)
    
    [VL] Refactor VeloxParquetDatasource.
---
 cpp/velox/benchmarks/ParquetWriteBenchmark.cc      |  3 +-
 cpp/velox/compute/VeloxRuntime.cc                  | 40 ++++++++++++++--
 .../operators/writer/VeloxParquetDatasource.cc     | 33 +------------
 .../operators/writer/VeloxParquetDatasource.h      | 53 +++++++++++----------
 .../operators/writer/VeloxParquetDatasourceGCS.h   | 51 ++++++++++++++++++++
 .../operators/writer/VeloxParquetDatasourceHDFS.h  | 54 ++++++++++++++++++++++
 .../operators/writer/VeloxParquetDatasourceS3.h    | 54 ++++++++++++++++++++++
 7 files changed, 225 insertions(+), 63 deletions(-)

diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc 
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index d9b2cf77a..7e9959797 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -266,8 +266,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : 
public GoogleBenchmar
       auto veloxParquetDatasource = 
std::make_unique<gluten::VeloxParquetDatasource>(
           outputPath_ + "/" + fileName,
           veloxPool->addAggregateChild("writer_benchmark"),
-          veloxPool->addLeafChild("s3_sink_pool"),
-          veloxPool->addLeafChild("gcs_sink_pool"),
+          veloxPool->addLeafChild("sink_pool"),
           localSchema);
 
       veloxParquetDatasource->init(runtime->getConfMap());
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 46d9bc85f..cfac1dadd 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -33,6 +33,18 @@
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
+#ifdef ENABLE_HDFS
+#include "operators/writer/VeloxParquetDatasourceHDFS.h"
+#endif
+
+#ifdef ENABLE_S3
+#include "operators/writer/VeloxParquetDatasourceS3.h"
+#endif
+
+#ifdef ENABLE_GCS
+#include "operators/writer/VeloxParquetDatasourceGCS.h"
+#endif
+
 using namespace facebook;
 
 namespace gluten {
@@ -185,10 +197,30 @@ std::shared_ptr<Datasource> 
VeloxRuntime::createDatasource(
   auto veloxPool = 
getAggregateVeloxPool(memoryManager)->addAggregateChild("datasource." + 
std::to_string(id++));
   // Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool
   // with parquet writer.
-  auto s3SinkPool = getLeafVeloxPool(memoryManager);
-  auto gcsSinkPool = getLeafVeloxPool(memoryManager);
-
-  return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, 
s3SinkPool, gcsSinkPool, schema);
+  auto sinkPool = getLeafVeloxPool(memoryManager);
+  if (isSupportedHDFSPath(filePath)) {
+#ifdef ENABLE_HDFS
+    return std::make_shared<VeloxParquetDatasourceHDFS>(filePath, veloxPool, 
sinkPool, schema);
+#else
+    throw std::runtime_error(
+        "The write path is hdfs path but the HDFS haven't been enabled when 
writing parquet data in velox runtime!");
+#endif
+  } else if (isSupportedS3SdkPath(filePath)) {
+#ifdef ENABLE_S3
+    return std::make_shared<VeloxParquetDatasourceS3>(filePath, veloxPool, 
sinkPool, schema);
+#else
+    throw std::runtime_error(
+        "The write path is S3 path but the S3 haven't been enabled when 
writing parquet data in velox runtime!");
+#endif
+  } else if (isSupportedGCSPath(filePath)) {
+#ifdef ENABLE_GCS
+    return std::make_shared<VeloxParquetDatasourceGCS>(filePath, veloxPool, 
sinkPool, schema);
+#else
+    throw std::runtime_error(
+        "The write path is GCS path but the GCS haven't been enabled when 
writing parquet data in velox runtime!");
+#endif
+  }
+  return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, 
sinkPool, schema);
 }
 
 std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc 
b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
index c45b9cb14..2677b0a81 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc
@@ -46,39 +46,8 @@ const int32_t kGzipWindowBits4k = 12;
 void VeloxParquetDatasource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
   if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
     sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
-  } else if (isSupportedS3SdkPath(filePath_)) {
-#ifdef ENABLE_S3
-    auto confs = 
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
-    auto hiveConfs = getHiveConfig(confs);
-    sink_ = dwio::common::FileSink::create(
-        filePath_,
-        {.connectorProperties = 
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
-         .pool = s3SinkPool_.get()});
-#else
-    throw std::runtime_error(
-        "The write path is S3 path but the S3 haven't been enabled when 
writing parquet data in velox runtime!");
-#endif
-  } else if (strncmp(filePath_.c_str(), "gs:", 3) == 0) {
-#ifdef ENABLE_GCS
-    auto fileSystem = getFileSystem(filePath_, nullptr);
-    auto* gcsFileSystem = 
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
-    sink_ = std::make_unique<dwio::common::WriteFileSink>(
-        gcsFileSystem->openFileForWrite(filePath_, {{}, gcsSinkPool_.get()}), 
filePath_);
-#else
-    throw std::runtime_error(
-        "The write path is GCS path but the GCS haven't been enabled when 
writing parquet data in velox runtime!");
-#endif
-  } else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) {
-#ifdef ENABLE_HDFS
-    sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
-#else
-    throw std::runtime_error(
-        "The write path is hdfs path but the HDFS haven't been enabled when 
writing parquet data in velox runtime!");
-#endif
-
   } else {
-    throw std::runtime_error(
-        "The file path is not local or hdfs when writing data with parquet 
format in velox runtime!");
+    throw std::runtime_error("The file path is not local when writing data 
with parquet format in velox runtime!");
   }
 
   ArrowSchema cSchema{};
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h 
b/cpp/velox/operators/writer/VeloxParquetDatasource.h
index 3daad8fe3..bf035c423 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h
@@ -51,20 +51,35 @@
 
 namespace gluten {
 
-class VeloxParquetDatasource final : public Datasource {
+inline bool isSupportedS3SdkPath(const std::string& filePath) {
+  // support scheme
+  const std::array<const char*, 5> supported_schemes = {"s3:", "s3a:", "oss:", 
"cos:", "cosn:"};
+
+  for (const char* scheme : supported_schemes) {
+    size_t scheme_length = std::strlen(scheme);
+    if (filePath.length() >= scheme_length && std::strncmp(filePath.c_str(), 
scheme, scheme_length) == 0) {
+      return true;
+    }
+  }
+  return false;
+}
+
+inline bool isSupportedGCSPath(const std::string& filePath) {
+  return strncmp(filePath.c_str(), "gs:", 3) == 0;
+}
+
+inline bool isSupportedHDFSPath(const std::string& filePath) {
+  return strncmp(filePath.c_str(), "hdfs:", 5) == 0;
+}
+
+class VeloxParquetDatasource : public Datasource {
  public:
   VeloxParquetDatasource(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> gcsSinkPool,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : Datasource(filePath, schema),
-        filePath_(filePath),
-        schema_(schema),
-        pool_(std::move(veloxPool)),
-        s3SinkPool_(std::move(s3SinkPool)),
-        gcsSinkPool_(std::move(gcsSinkPool)) {}
+      : Datasource(filePath, schema), filePath_(filePath), schema_(schema), 
pool_(std::move(veloxPool)) {}
 
   void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override;
   void inspectSchema(struct ArrowSchema* out) override;
@@ -74,31 +89,19 @@ class VeloxParquetDatasource final : public Datasource {
     return schema_;
   }
 
-  bool isSupportedS3SdkPath(const std::string& filePath_) {
-    // support scheme
-    const std::array<const char*, 5> supported_schemes = {"s3:", "s3a:", 
"oss:", "cos:", "cosn:"};
-
-    for (const char* scheme : supported_schemes) {
-      size_t scheme_length = std::strlen(scheme);
-      if (filePath_.length() >= scheme_length && 
std::strncmp(filePath_.c_str(), scheme, scheme_length) == 0) {
-        return true;
-      }
-    }
-    return false;
-  }
+ protected:
+  std::string filePath_;
+  std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool_;
+  std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
 
  private:
   int64_t maxRowGroupBytes_ = 134217728; // 128MB
   int64_t maxRowGroupRows_ = 100000000; // 100M
 
-  std::string filePath_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<const facebook::velox::Type> type_;
   std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
-  std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool_;
-  std::shared_ptr<facebook::velox::memory::MemoryPool> gcsSinkPool_;
-  std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
new file mode 100644
index 000000000..b8a9b5431
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+class VeloxParquetDatasourceGCS final : public VeloxParquetDatasource {
+ public:
+  VeloxParquetDatasourceGCS(
+      const std::string& filePath,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+      std::shared_ptr<arrow::Schema> schema)
+      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+    auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
+    auto* gcsFileSystem = 
dynamic_cast<filesystems::GCSFileSystem*>(fileSystem.get());
+    sink_ = std::make_unique<dwio::common::WriteFileSink>(
+        gcsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}), 
filePath_);
+    VeloxParquetDatasource::init(sparkConfs);
+  }
+};
+} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
new file mode 100644
index 000000000..1b37a7c6f
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+
+class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
+ public:
+  VeloxParquetDatasourceHDFS(
+      const std::string& filePath,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+      std::shared_ptr<arrow::Schema> schema)
+      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+    auto confs = 
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
+    auto hiveConfs = getHiveConfig(confs);
+    sink_ = dwio::common::FileSink::create(
+        filePath_,
+        {.connectorProperties = 
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
+         .pool = sinkPool_.get()});
+    VeloxParquetDatasource::init(sparkConfs);
+  }
+};
+} // namespace gluten
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h 
b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
new file mode 100644
index 000000000..92965d4e3
--- /dev/null
+++ b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/writer/VeloxParquetDatasource.h"
+#include "utils/ConfigExtractor.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "compute/VeloxRuntime.h"
+
+#include "velox/common/compression/Compression.h"
+#include "velox/core/QueryConfig.h"
+#include "velox/core/QueryCtx.h"
+#include "velox/dwio/common/Options.h"
+
+namespace gluten {
+
+class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
+ public:
+  VeloxParquetDatasourceS3(
+      const std::string& filePath,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
+      std::shared_ptr<arrow::Schema> schema)
+      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+  void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override {
+    auto confs = 
std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
+    auto hiveConfs = getHiveConfig(confs);
+    sink_ = dwio::common::FileSink::create(
+        filePath_,
+        {.connectorProperties = 
std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
+         .pool = sinkPool_.get()});
+    VeloxParquetDatasource::init(sparkConfs);
+  }
+};
+} // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to