lidavidm commented on a change in pull request #10913:
URL: https://github.com/apache/arrow/pull/10913#discussion_r692111675



##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+
+namespace skyhook {
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+/// \struct RadosConnCtx
+/// \brief A struct to hold the parameters required
+/// for connecting to a RADOS cluster.
+struct RadosConnCtx {
+  std::string ceph_config_path;
+  std::string ceph_data_pool;
+  std::string ceph_user_name;
+  std::string ceph_cluster_name;
+  std::string ceph_cls_name;
+
+  RadosConnCtx(const std::string& ceph_config_path, const std::string& 
ceph_data_pool,
+               const std::string& ceph_user_name, const std::string& 
ceph_cluster_name,
+               const std::string& ceph_cls_name)
+      : ceph_config_path(ceph_config_path),
+        ceph_data_pool(ceph_data_pool),
+        ceph_user_name(ceph_user_name),
+        ceph_cluster_name(ceph_cluster_name),
+        ceph_cls_name(ceph_cls_name) {}
+};
+
+/// \class SkyhookFileFormat
+/// \brief A FileFormat implementation that offloads fragment
+/// scan operations to the Ceph OSDs.
+class SkyhookFileFormat : public arrow::dataset::ParquetFileFormat {

Review comment:
       I think it'd be appreciated to link to the Skyhook paper or docs as well 
here as developers are much less likely to be familiar with it.

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -90,6 +90,8 @@ class ARROW_DS_EXPORT Fragment : public 
std::enable_shared_from_this<Fragment> {
 
   virtual ~Fragment() = default;
 
+  bool handles_compute = true;

Review comment:
       Can we document what this flag means?

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {

Review comment:
       These methods are missing `override` specifiers.

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist request;
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector batches;
+    RETURN_NOT_OK(skyhook::DeserializeTable(batches, result, 
!options_->use_threads));
+    return arrow::MakeVectorIterator(batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->handles_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format.");

Review comment:
       ```suggestion
         return arrow::Status::Invalid("Unsupported file format ", 
file_format_);
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist request;
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector batches;
+    RETURN_NOT_OK(skyhook::DeserializeTable(batches, result, 
!options_->use_threads));
+    return arrow::MakeVectorIterator(batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->handles_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format.");
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), std::move(doa_), 
file_format,
+        file->partition_expression())};
+    return arrow::MakeVectorIterator(v);
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
+      const arrow::dataset::FileSource& source) const {
+    std::shared_ptr<arrow::dataset::FileFormat> file_format;
+    if (file_format_ == "parquet") {
+      file_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+    } else if (file_format_ == "ipc") {
+      file_format = std::make_shared<arrow::dataset::IpcFileFormat>();
+    } else {
+      return arrow::Status::Invalid("Unsupported file format.");

Review comment:
       ```suggestion
         return arrow::Status::Invalid("Unsupported file format ", 
file_format_);
   ```

##########
File path: cpp/src/skyhook/protocol/ScanRequest.fbs
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// EXPERIMENTAL: Metadata for n-dimensional arrays, aka "tensors" or
+/// "ndarrays". Arrow implementations in general are not required to implement
+/// this type

Review comment:
       This comment needs updating.

##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+
+namespace skyhook {
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+/// \struct RadosConnCtx
+/// \brief A struct to hold the parameters required
+/// for connecting to a RADOS cluster.
+struct RadosConnCtx {
+  std::string ceph_config_path;
+  std::string ceph_data_pool;
+  std::string ceph_user_name;
+  std::string ceph_cluster_name;
+  std::string ceph_cls_name;
+
+  RadosConnCtx(const std::string& ceph_config_path, const std::string& 
ceph_data_pool,
+               const std::string& ceph_user_name, const std::string& 
ceph_cluster_name,
+               const std::string& ceph_cls_name)
+      : ceph_config_path(ceph_config_path),
+        ceph_data_pool(ceph_data_pool),
+        ceph_user_name(ceph_user_name),
+        ceph_cluster_name(ceph_cluster_name),
+        ceph_cls_name(ceph_cls_name) {}
+};
+
+/// \class SkyhookFileFormat
+/// \brief A FileFormat implementation that offloads fragment
+/// scan operations to the Ceph OSDs.
+class SkyhookFileFormat : public arrow::dataset::ParquetFileFormat {
+ public:
+  SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx, std::string 
file_format);
+  ~SkyhookFileFormat();
+
+  std::string type_name() const override { return "skyhook"; }
+
+  bool splittable() const { return true; }
+
+  bool Equals(const arrow::dataset::FileFormat& other) const override {
+    return type_name() == other.type_name();
+  }
+
+  /// \brief Initialize the SkyhookFileFormat by connecting to RADOS.
+  arrow::Status Init();

Review comment:
       If there's an Init needed after initialization, it would be good to make 
the constructor private and provide a static Make() method so that you cannot 
forget to call Init.

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }
+
+ private:
+  arrow::Status s_;
+  int code_;
+};
+
+class IoCtxInterface {
+ public:
+  IoCtxInterface() {}
+
+  /// \brief Write data to an object.
+  ///
+  /// \param[in] oid the ID of the object to write.
+  /// \param[in] bl a bufferlist containing the data to write to the object.
+  virtual RadosStatus write_full(const std::string& oid, ceph::bufferlist& bl) 
= 0;
+
+  /// \brief Read a RADOS object.
+  ///
+  /// \param[in] oid the object ID which to read.
+  /// \param[in] bl a bufferlist to hold the contents of the read object.
+  /// \param[in] len the length of data to read from an object.
+  /// \param[in] offset the offset of the object to read from.
+  virtual RadosStatus read(const std::string& oid, ceph::bufferlist& bl, 
size_t len,
+                           uint64_t offset) = 0;
+
+  /// \brief Executes a CLS function.
+  ///
+  /// \param[in] oid the object ID on which to execute the CLS function.
+  /// \param[in] cls the name of the CLS.
+  /// \param[in] method the name of the CLS function.
+  /// \param[in] in a bufferlist to send data to the CLS function.
+  /// \param[in] out a bufferlist to recieve data from the CLS function.
+  virtual RadosStatus exec(const std::string& oid, const char* cls, const 
char* method,
+                           ceph::bufferlist& in, ceph::bufferlist& out) = 0;
+
+  virtual std::vector<std::string> list() = 0;

Review comment:
       nit: what is this a list of?

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }
+
+ private:
+  arrow::Status s_;
+  int code_;
+};
+
+class IoCtxInterface {

Review comment:
       Is there a need to have interfaces when there seems to be only one 
possible implementation? pImpl might be better suited




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to