westonpace commented on a change in pull request #10431: URL: https://github.com/apache/arrow/pull/10431#discussion_r687117076
########## File path: cpp/src/arrow/dataset/file_skyhook.h ########## @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This API is EXPERIMENTAL. +#define _FILE_OFFSET_BITS 64 + +#pragma once + +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include <functional> +#include <memory> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/discovery.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/rados.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" +#include "arrow/filesystem/api.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/iterator.h" +#include "arrow/util/macros.h" +#include "parquet/arrow/writer.h" +#include "parquet/exception.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_ERR_MSG "failed to scan file fragment" + +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request" + +#define SCAN_RES_SER_ERR_CODE 27 +#define SCAN_RES_SER_ERR_MSG "failed to serialize result table" + +namespace arrow { +namespace dataset { + +enum SkyhookFileType { PARQUET, IPC }; + +/// \addtogroup dataset-file-formats +/// +/// @{ + +namespace connection { +/// \brief An interface for general connections. +class ARROW_DS_EXPORT Connection { + public: + virtual Status Connect() = 0; + + Connection() = default; + virtual ~Connection() = default; +}; + +/// \class RadosConnection +/// \brief An interface to connect to a Rados cluster and hold the connection +/// information for usage in later stages. +class ARROW_DS_EXPORT RadosConnection : public Connection { + public: + struct RadosConnectionCtx { + std::string ceph_config_path; + std::string data_pool; + std::string user_name; + std::string cluster_name; + std::string cls_name; + + RadosConnectionCtx(const std::string& ceph_config_path, const std::string& data_pool, + const std::string& user_name, const std::string& cluster_name, + const std::string& cls_name) + : ceph_config_path(ceph_config_path), + data_pool(data_pool), + user_name(user_name), + cluster_name(cluster_name), + cls_name(cls_name) {} + }; + explicit RadosConnection(const RadosConnectionCtx& ctx) + : Connection(), + ctx(ctx), + rados(new RadosWrapper()), + ioCtx(new IoCtxWrapper()), + connected(false) {} + + ~RadosConnection(); + + /// \brief Connect to the Rados cluster. + /// \return Status. + Status Connect(); + + /// \brief Shutdown the connection to the Rados cluster. + /// \return Status. + Status Shutdown(); + + RadosConnectionCtx ctx; + RadosInterface* rados; + IoCtxInterface* ioCtx; + bool connected; +}; + +} // namespace connection + +/// \class SkyhookDirectObjectAccess +/// \brief Interface for translating the name of a file in CephFS to its +/// corresponding object ID in RADOS assuming 1:1 mapping between a file +/// and its underlying object. +class ARROW_DS_EXPORT SkyhookDirectObjectAccess { + public: + explicit SkyhookDirectObjectAccess( + const std::shared_ptr<connection::RadosConnection>& connection) + : connection_(std::move(connection)) {} + + /// \brief Executes the POSIX stat call on a file. + /// \param[in] path Path of the file. + /// \param[out] st Refernce to the struct object to store the result. + /// \return Status. + Status Stat(const std::string& path, struct stat& st) { + struct stat file_st; + if (stat(path.c_str(), &file_st) < 0) + return Status::Invalid("stat returned non-zero exit code."); + st = file_st; + return Status::OK(); + } + + // Helper function to convert Inode to ObjectID because Rados calls work with + // ObjectIDs. + std::string ConvertFileInodeToObjectID(uint64_t inode) { + std::stringstream ss; + ss << std::hex << inode; + std::string oid(ss.str() + ".00000000"); + return oid; + } + + /// \brief Executes query on the librados node. It uses the librados::exec API to + /// perform queries on the storage node and stores the result in the output bufferlist. + /// \param[in] inode inode of the file. + /// \param[in] fn The function to be executed by the librados::exec call. + /// \param[in] in The input bufferlist. + /// \param[out] out The output bufferlist. + /// \return Status. + Status Exec(uint64_t inode, const std::string& fn, ceph::bufferlist& in, + ceph::bufferlist& out) { + std::string oid = ConvertFileInodeToObjectID(inode); + int e = connection_->ioCtx->exec(oid.c_str(), connection_->ctx.cls_name.c_str(), + fn.c_str(), in, out); + if (e == SCAN_ERR_CODE) return Status::Invalid(SCAN_ERR_MSG); + if (e == SCAN_REQ_DESER_ERR_CODE) return Status::Invalid(SCAN_REQ_DESER_ERR_MSG); + if (e == SCAN_RES_SER_ERR_CODE) return Status::Invalid(SCAN_RES_SER_ERR_MSG); + return Status::OK(); + } + + protected: + std::shared_ptr<connection::RadosConnection> connection_; +}; + +/// \class SkyhookFileFormat +/// \brief A ParquetFileFormat implementation that offloads the fragment +/// scan operations to the Ceph OSDs +class ARROW_DS_EXPORT SkyhookFileFormat : public ParquetFileFormat { Review comment: > the notion of offloading computation to the storage layer is a more fundamental issue than a specialization of Scanner or FileFormat This is likely true. It would be good to start brainstorming what sorts of concepts we are missing. The mailing list could be a good place for discussion & brainstorming or future PRs could be a good place for more concrete concepts/proposals. At the moment we have (I'll add that these APIs have evolved considerably since the start of the year): * **FileFormat** - A file format converts a stream of bytes (regardless of source) into a stream of record batches. * **Scanner** - A scanner converts a dataset (list of fragments) into a stream of record batches. The scanner also applies filtering & projection. * **Fragment** - An independently scannable unit * **FileFragment** - An independently scannable unit backed by a file, relies on `FileFormat`. The current scanner is also a fragment scheduler. Given a list of fragments it selects N fragments to start scanning and merges (and sometimes sequences) their results. Arrow itself is also in flux with these interfaces. The scanner is being split into a scan node which does fragment scheduling and the filtering/projection pieces are being pulled into their own nodes. The downside of skyhook being a scanner that I can see is that skyhook doesn't do any fragment scheduling today, and it would have to reinvent that. On the other hand, maybe skyhook knows more information that would help it do this scheduling. For example, Arrow just sort of picks "8 fragments at a time" is probably good enough (kDefaultFragmentReadahead) which is overkill for HDD, about right for SDD and S3, and underkill for certain EC2/S3 configurations. I have no idea what is appropriate for skyhook (I assume it would be based on the # of nodes in the ceph cluster and maybe the file distribution). This particular fact wouldn't necessarily require an additional scanner implementation but maybe some other fact does (e.g. maybe there is a certain ordering in which fragments should be scheduled). I can understand why one wouldn't want Skyhook to be a Scanner. The downside of skyhook being a file format is that, if it isn't the right fit, it starts to break apart the abstraction. For example, we have to introduce `FileFormat::dataset_schema`, there are potential Liskov violations which could mean the code is more brittle (i.e. we might start using FileFormat differently or elsewhere and the Rados implementation doesn't fit the new usage). Perhaps Skyhook is a good fit for `Fragment`. This would allow Skyhook to take advantage of the Scanner's scheduling but give it more flexibility. Adding `handles_filtering` and `handles_projection` properties to a fragment seems appropriate too. Or, Skyhook could attach a "guarantee" to the returned batch (this concept isn't in scanner yet but it is emerging in the exec plan abstractions) so that future filtering mechanisms would skip it because the guarantee shows it is already filtered. I'm also pretty amenable to the logic of "we don't need to get it perfect now" and we can get something working and iterate on it in the future. I appreciate the effort to break things into smaller PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
