lidavidm commented on a change in pull request #10913: URL: https://github.com/apache/arrow/pull/10913#discussion_r718791936
########## File path: cpp/src/skyhook/client/file_skyhook.h ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "arrow/api.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace skyhook { + +/// \addtogroup dataset-file-formats +/// +/// @{ + +/// \struct RadosConnCtx +/// \brief A struct to hold the parameters required +/// for connecting to a RADOS cluster. +struct RadosConnCtx { + std::string ceph_config_path; + std::string ceph_data_pool; + std::string ceph_user_name; + std::string ceph_cluster_name; + std::string ceph_cls_name; + + RadosConnCtx(const std::string& ceph_config_path, const std::string& ceph_data_pool, + const std::string& ceph_user_name, const std::string& ceph_cluster_name, + const std::string& ceph_cls_name) + : ceph_config_path(ceph_config_path), + ceph_data_pool(ceph_data_pool), + ceph_user_name(ceph_user_name), + ceph_cluster_name(ceph_cluster_name), + ceph_cls_name(ceph_cls_name) {} +}; + +/// \class SkyhookFileFormat Review comment: Given this overrides quite a few things, is there any value from deriving from ParquetFileFormat still? We might inherit implementations that don't actually apply anymore (CountRows, ScanBatchesAsync for instance). ########## File path: cpp/src/skyhook/client/file_skyhook.h ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "arrow/api.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace skyhook { + +/// \addtogroup dataset-file-formats +/// +/// @{ + +/// \struct RadosConnCtx +/// \brief A struct to hold the parameters required +/// for connecting to a RADOS cluster. +struct RadosConnCtx { + std::string ceph_config_path; + std::string ceph_data_pool; + std::string ceph_user_name; + std::string ceph_cluster_name; + std::string ceph_cls_name; + + RadosConnCtx(const std::string& ceph_config_path, const std::string& ceph_data_pool, + const std::string& ceph_user_name, const std::string& ceph_cluster_name, + const std::string& ceph_cls_name) + : ceph_config_path(ceph_config_path), + ceph_data_pool(ceph_data_pool), + ceph_user_name(ceph_user_name), + ceph_cluster_name(ceph_cluster_name), + ceph_cls_name(ceph_cls_name) {} +}; + +/// \class SkyhookFileFormat +/// \brief A FileFormat implementation that offloads fragment +/// scan operations to the Ceph OSDs. For more details, see the +/// Skyhook paper, https://arxiv.org/pdf/2105.09894.pdf. +class SkyhookFileFormat : public arrow::dataset::ParquetFileFormat { + public: + static arrow::Result<std::shared_ptr<SkyhookFileFormat>> Make( + std::shared_ptr<RadosConnCtx> ctx, std::string file_format); + SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx, std::string file_format); Review comment: This can be delayed to a follow-up, but it may make more sense to take `std::shared_ptr<FileFormat>`. For instance, ParquetFileFormat provides options that potentially change the returned schema. (Of course, as implemented, those options are not serialized and sent to Ceph, hence it can be a followup.) ########## File path: cpp/src/skyhook/client/file_skyhook.h ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "arrow/api.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace skyhook { + +/// \addtogroup dataset-file-formats +/// +/// @{ + +/// \struct RadosConnCtx +/// \brief A struct to hold the parameters required +/// for connecting to a RADOS cluster. +struct RadosConnCtx { + std::string ceph_config_path; + std::string ceph_data_pool; + std::string ceph_user_name; + std::string ceph_cluster_name; + std::string ceph_cls_name; + + RadosConnCtx(const std::string& ceph_config_path, const std::string& ceph_data_pool, + const std::string& ceph_user_name, const std::string& ceph_cluster_name, + const std::string& ceph_cls_name) + : ceph_config_path(ceph_config_path), + ceph_data_pool(ceph_data_pool), + ceph_user_name(ceph_user_name), + ceph_cluster_name(ceph_cluster_name), + ceph_cls_name(ceph_cls_name) {} +}; + +/// \class SkyhookFileFormat Review comment: Related, we should tag follow-up JIRAs to implement these methods where possible. ########## File path: cpp/src/skyhook/cls/cls_skyhook_test.cc ########## @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "skyhook/client/file_skyhook.h" + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_base.h" +#include "arrow/filesystem/api.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/iterator.h" +#include "gtest/gtest.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() { Review comment: The constants here must match the paths from integration_skyhook.sh, right? It would be good to note that here for future reference. ########## File path: cpp/src/skyhook/client/file_skyhook.cc ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "skyhook/client/file_skyhook.h" +#include "skyhook/protocol/rados_protocol.h" +#include "skyhook/protocol/skyhook_protocol.h" + +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/util/compression.h" + +namespace skyhook { + +/// A ScanTask to scan a file fragment in Skyhook format. +class SkyhookScanTask : public arrow::dataset::ScanTask { + public: + SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options, + std::shared_ptr<arrow::dataset::Fragment> fragment, + arrow::dataset::FileSource source, + std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa, + skyhook::SkyhookFileType::type file_format, + arrow::compute::Expression partition_expression) + : ScanTask(std::move(options), std::move(fragment)), + source_(std::move(source)), + doa_(std::move(doa)), + file_format_(file_format), + partition_expression_(partition_expression) {} + + arrow::Result<arrow::RecordBatchIterator> Execute() override { + /// Retrieve the size of the file using POSIX `stat`. + struct stat st {}; + RETURN_NOT_OK(doa_->Stat(source_.path(), st)); + + /// Create a ScanRequest instance. + skyhook::ScanRequest req; + req.filter_expression = options_->filter; + req.partition_expression = partition_expression_; + req.projection_schema = options_->projected_schema; + req.dataset_schema = options_->dataset_schema; + req.file_size = st.st_size; + req.file_format = file_format_; + + /// Serialize the ScanRequest into a ceph bufferlist. + ceph::bufferlist* request = new ceph::bufferlist(); + RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request)); + + /// Execute the Ceph object class method `scan_op`. + ceph::bufferlist result; + RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result)); + + /// Read RecordBatches from the result bufferlist. Since, this step might use + /// threads for decompressing compressed batches, to avoid running into + /// [ARROW-12597], we switch off threaded decompression to avoid nested threading + /// scenarios when scan tasks are executed in parallel by the CpuThreadPool. + arrow::RecordBatchVector* batches = new arrow::RecordBatchVector(); + RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, batches)); + return arrow::MakeVectorIterator(*batches); + } + + protected: + arrow::dataset::FileSource source_; + std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_; + skyhook::SkyhookFileType::type file_format_; + arrow::compute::Expression partition_expression_; +}; + +class SkyhookFileFormat::Impl { + public: + Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format) + : ctx_(std::move(ctx)), file_format_(file_format) {} + + ~Impl() {} + + arrow::Status Init() { + /// Connect to the RADOS cluster and instantiate a `SkyhookDirectObjectAccess` + /// instance. + auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_); + RETURN_NOT_OK(connection->Connect()); + doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection); + return arrow::Status::OK(); + } + + arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile( + const std::shared_ptr<arrow::dataset::ScanOptions>& options, + const std::shared_ptr<arrow::dataset::FileFragment>& file) const { + /// Make sure client-side filtering and projection is turned off. + file->apply_compute = false; + + /// Convert string file format name to Enum. + skyhook::SkyhookFileType::type file_format; + if (file_format_ == "parquet") { + file_format = skyhook::SkyhookFileType::type::PARQUET; + } else if (file_format_ == "ipc") { + file_format = skyhook::SkyhookFileType::type::IPC; + } else { + return arrow::Status::Invalid("Unsupported file format ", file_format_); + } + + arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>( + std::move(options), std::move(file), file->source(), doa_, file_format, + file->partition_expression())}; + return arrow::MakeVectorIterator(v); + } + + arrow::Result<std::shared_ptr<arrow::Schema>> Inspect( + const arrow::dataset::FileSource& source) const { + std::shared_ptr<arrow::dataset::FileFormat> file_format; + /// Convert string file format name to Arrow FileFormat. + if (file_format_ == "parquet") { + file_format = std::make_shared<arrow::dataset::ParquetFileFormat>(); + } else if (file_format_ == "ipc") { + file_format = std::make_shared<arrow::dataset::IpcFileFormat>(); + } else { + return arrow::Status::Invalid("Unsupported file format ", file_format_); + } + std::shared_ptr<arrow::Schema> schema; + ARROW_ASSIGN_OR_RAISE(schema, file_format->Inspect(source)); + return schema; + } + + private: + std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_; + std::shared_ptr<RadosConnCtx> ctx_; + std::string file_format_; +}; + +arrow::Result<std::shared_ptr<SkyhookFileFormat>> SkyhookFileFormat::Make( + std::shared_ptr<RadosConnCtx> ctx, std::string file_format) { + auto format = std::make_shared<SkyhookFileFormat>(std::move(ctx), file_format); + RETURN_NOT_OK(format->Init()); + return format; +} + +SkyhookFileFormat::SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx, + std::string file_format) + : impl_(new Impl(std::move(ctx), file_format)) {} + +SkyhookFileFormat::~SkyhookFileFormat() {} + +arrow::Status SkyhookFileFormat::Init() { return impl_->Init(); } + +arrow::Result<std::shared_ptr<arrow::Schema>> SkyhookFileFormat::Inspect( + const arrow::dataset::FileSource& source) const { + return impl_->Inspect(source); +} + +arrow::Result<arrow::dataset::ScanTaskIterator> SkyhookFileFormat::ScanFile( + const std::shared_ptr<arrow::dataset::ScanOptions>& options, + const std::shared_ptr<arrow::dataset::FileFragment>& file) const { + return impl_->ScanFile(std::move(options), std::move(file)); Review comment: nit: I don't think there's any use in moving const references -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
