westonpace commented on a change in pull request #10913:
URL: https://github.com/apache/arrow/pull/10913#discussion_r719776809



##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);

Review comment:
       ```suggestion
       arrow::RecordBatchVector batches;
       RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
&batches));
       return arrow::MakeVectorIterator(std::move(batches));
   ```
   Same comment as above, if this needs to be heap allocated then please 
explain why.  Keep in mind that there are very very few places in the code base 
where we directly use `new` (unless creating a `unique_ptr`).

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));

Review comment:
       ```suggestion
       ceph::bufferlist request;
       RETURN_NOT_OK(skyhook::SerializeScanRequest(req, &request));
   
       /// Execute the Ceph object class method `scan_op`.
       ceph::bufferlist result;
       RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
   ```
   Does `request` need to be heap allocated?  Is there no way to use smart 
pointers here?  If it has to be heap allocated then add a comment explaining 
why.

##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+
+namespace skyhook {
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+/// \struct RadosConnCtx
+/// \brief A struct to hold the parameters required
+/// for connecting to a RADOS cluster.
+struct RadosConnCtx {
+  std::string ceph_config_path;
+  std::string ceph_data_pool;
+  std::string ceph_user_name;
+  std::string ceph_cluster_name;
+  std::string ceph_cls_name;
+
+  RadosConnCtx(const std::string& ceph_config_path, const std::string& 
ceph_data_pool,
+               const std::string& ceph_user_name, const std::string& 
ceph_cluster_name,
+               const std::string& ceph_cls_name)
+      : ceph_config_path(ceph_config_path),
+        ceph_data_pool(ceph_data_pool),
+        ceph_user_name(ceph_user_name),
+        ceph_cluster_name(ceph_cluster_name),
+        ceph_cls_name(ceph_cls_name) {}
+};
+
+/// \class SkyhookFileFormat
+/// \brief A FileFormat implementation that offloads fragment
+/// scan operations to the Ceph OSDs. For more details, see the
+/// Skyhook paper, https://arxiv.org/pdf/2105.09894.pdf.
+class SkyhookFileFormat : public arrow::dataset::ParquetFileFormat {
+ public:
+  static arrow::Result<std::shared_ptr<SkyhookFileFormat>> Make(
+      std::shared_ptr<RadosConnCtx> ctx, std::string file_format);
+  SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx, std::string 
file_format);
+
+  ~SkyhookFileFormat();
+
+  std::string type_name() const override { return "skyhook"; }
+
+  bool splittable() const { return true; }
+

Review comment:
       ```suggestion
   ```
   This method is no longer needed.

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->apply_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), doa_, file_format,
+        file->partition_expression())};
+    return arrow::MakeVectorIterator(v);
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
+      const arrow::dataset::FileSource& source) const {
+    std::shared_ptr<arrow::dataset::FileFormat> file_format;
+    /// Convert string file format name to Arrow FileFormat.
+    if (file_format_ == "parquet") {
+      file_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+    } else if (file_format_ == "ipc") {
+      file_format = std::make_shared<arrow::dataset::IpcFileFormat>();
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+    std::shared_ptr<arrow::Schema> schema;
+    ARROW_ASSIGN_OR_RAISE(schema, file_format->Inspect(source));
+    return schema;
+  }
+
+ private:
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  std::shared_ptr<RadosConnCtx> ctx_;
+  std::string file_format_;
+};
+
+arrow::Result<std::shared_ptr<SkyhookFileFormat>> SkyhookFileFormat::Make(
+    std::shared_ptr<RadosConnCtx> ctx, std::string file_format) {
+  auto format = std::make_shared<SkyhookFileFormat>(std::move(ctx), 
file_format);

Review comment:
       ```suggestion
     auto format = std::make_shared<SkyhookFileFormat>(std::move(ctx), 
std::move(file_format));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;

Review comment:
       ```suggestion
     s.base_dir = std::move(dir);
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  EXPECT_OK_AND_ASSIGN(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
+                                         fs, s, format, options));

Review comment:
       ```suggestion
                                            std::move(fs), s, 
std::move(format), options));
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, SerDeserScanRequest) {
+  ceph::bufferlist* bl = new ceph::bufferlist();
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  ASSERT_OK(skyhook::SerializeScanRequest(req, bl));
+
+  skyhook::ScanRequest* req_ = new skyhook::ScanRequest();
+  ASSERT_OK(skyhook::DeserializeScanRequest(*bl, req_));
+  ASSERT_TRUE(req.filter_expression.Equals(req_->filter_expression));
+  ASSERT_TRUE(req.partition_expression.Equals(req_->partition_expression));
+  ASSERT_TRUE(req.projection_schema->Equals(req_->projection_schema));
+  ASSERT_TRUE(req.dataset_schema->Equals(req_->dataset_schema));
+  ASSERT_EQ(req.file_size, req_->file_size);
+  ASSERT_EQ(req.file_format, req_->file_format);

Review comment:
       ```suggestion
     ASSERT_TRUE(req.filter_expression.Equals(req_.filter_expression));
     ASSERT_TRUE(req.partition_expression.Equals(req_.partition_expression));
     ASSERT_TRUE(req.projection_schema->Equals(req_.projection_schema));
     ASSERT_TRUE(req.dataset_schema->Equals(req_.dataset_schema));
     ASSERT_EQ(req.file_size, req_.file_size);
     ASSERT_EQ(req.file_format, req_.file_format);
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}

Review comment:
       ```suggestion
     ~Impl() = default;
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->apply_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), doa_, file_format,

Review comment:
       ```suggestion
           options, file, file->source(), doa_, file_format,
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+
+namespace skyhook {
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+/// \struct RadosConnCtx
+/// \brief A struct to hold the parameters required
+/// for connecting to a RADOS cluster.
+struct RadosConnCtx {
+  std::string ceph_config_path;
+  std::string ceph_data_pool;
+  std::string ceph_user_name;
+  std::string ceph_cluster_name;
+  std::string ceph_cls_name;
+
+  RadosConnCtx(const std::string& ceph_config_path, const std::string& 
ceph_data_pool,
+               const std::string& ceph_user_name, const std::string& 
ceph_cluster_name,
+               const std::string& ceph_cls_name)
+      : ceph_config_path(ceph_config_path),
+        ceph_data_pool(ceph_data_pool),
+        ceph_user_name(ceph_user_name),
+        ceph_cluster_name(ceph_cluster_name),
+        ceph_cls_name(ceph_cls_name) {}
+};
+
+/// \class SkyhookFileFormat
+/// \brief A FileFormat implementation that offloads fragment
+/// scan operations to the Ceph OSDs. For more details, see the
+/// Skyhook paper, https://arxiv.org/pdf/2105.09894.pdf.
+class SkyhookFileFormat : public arrow::dataset::ParquetFileFormat {
+ public:
+  static arrow::Result<std::shared_ptr<SkyhookFileFormat>> Make(
+      std::shared_ptr<RadosConnCtx> ctx, std::string file_format);
+  SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx, std::string 
file_format);
+
+  ~SkyhookFileFormat();

Review comment:
       ```suggestion
     ~SkyhookFileFormat() override;
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+
+namespace skyhook {
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+/// \struct RadosConnCtx
+/// \brief A struct to hold the parameters required
+/// for connecting to a RADOS cluster.
+struct RadosConnCtx {
+  std::string ceph_config_path;
+  std::string ceph_data_pool;
+  std::string ceph_user_name;
+  std::string ceph_cluster_name;
+  std::string ceph_cls_name;
+
+  RadosConnCtx(const std::string& ceph_config_path, const std::string& 
ceph_data_pool,
+               const std::string& ceph_user_name, const std::string& 
ceph_cluster_name,
+               const std::string& ceph_cls_name)
+      : ceph_config_path(ceph_config_path),
+        ceph_data_pool(ceph_data_pool),
+        ceph_user_name(ceph_user_name),
+        ceph_cluster_name(ceph_cluster_name),
+        ceph_cls_name(ceph_cls_name) {}
+};

Review comment:
       ```suggestion
     RadosConnCtx(std::string ceph_config_path, std::string ceph_data_pool,
                  std::string ceph_user_name, std::string ceph_cluster_name,
                  std::string ceph_cls_name)
         : ceph_config_path(std::move(ceph_config_path)),
           ceph_data_pool(std::move(ceph_data_pool)),
           ceph_user_name(std::move(ceph_user_name)),
           ceph_cluster_name(std::move(ceph_cluster_name)),
           ceph_cls_name(std::move(ceph_cls_name)) {}
   };
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}

Review comment:
       ```suggestion
         : ctx_(std::move(ctx)), file_format_(std::move(file_format)) {}
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->apply_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), doa_, file_format,
+        file->partition_expression())};
+    return arrow::MakeVectorIterator(v);
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
+      const arrow::dataset::FileSource& source) const {
+    std::shared_ptr<arrow::dataset::FileFormat> file_format;
+    /// Convert string file format name to Arrow FileFormat.
+    if (file_format_ == "parquet") {
+      file_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+    } else if (file_format_ == "ipc") {
+      file_format = std::make_shared<arrow::dataset::IpcFileFormat>();
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+    std::shared_ptr<arrow::Schema> schema;
+    ARROW_ASSIGN_OR_RAISE(schema, file_format->Inspect(source));
+    return schema;
+  }
+
+ private:
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  std::shared_ptr<RadosConnCtx> ctx_;
+  std::string file_format_;
+};
+
+arrow::Result<std::shared_ptr<SkyhookFileFormat>> SkyhookFileFormat::Make(
+    std::shared_ptr<RadosConnCtx> ctx, std::string file_format) {
+  auto format = std::make_shared<SkyhookFileFormat>(std::move(ctx), 
file_format);
+  RETURN_NOT_OK(format->Init());
+  return format;
+}
+
+SkyhookFileFormat::SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx,
+                                     std::string file_format)
+    : impl_(new Impl(std::move(ctx), file_format)) {}

Review comment:
       ```suggestion
       : impl_(new Impl(std::move(ctx), std::move(file_format))) {}
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}

Review comment:
       ```suggestion
           partition_expression_(std::move(partition_expression)) {}
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+
+#include <memory>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"

Review comment:
       ```suggestion
   ```
   I don't think this is used.

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+
+#include <memory>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<std::shared_ptr<ceph::bufferlist>>();
+  }
+
+  ~RandomAccessObject() { Close(); }

Review comment:
       ```suggestion
     ~RandomAccessObject() override { DCHECK_OK(Close()); }
   ```
   I'm not sure how you'd want to handle failure on close here.  DCHECK_OK will 
abort if it fails for some reason and is a good failsafe if you don't expect it 
to ever fail.
   
   You'll need to include `arrow/util/logging.h` to use it.

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist* request = new ceph::bufferlist();
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", *request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might 
use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested 
threading
+    /// scenarios when scan tasks are executed in parallel by the 
CpuThreadPool.
+    arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+    RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, 
batches));
+    return arrow::MakeVectorIterator(*batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a 
`SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->apply_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), doa_, file_format,
+        file->partition_expression())};
+    return arrow::MakeVectorIterator(v);
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
+      const arrow::dataset::FileSource& source) const {
+    std::shared_ptr<arrow::dataset::FileFormat> file_format;
+    /// Convert string file format name to Arrow FileFormat.
+    if (file_format_ == "parquet") {
+      file_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+    } else if (file_format_ == "ipc") {
+      file_format = std::make_shared<arrow::dataset::IpcFileFormat>();
+    } else {
+      return arrow::Status::Invalid("Unsupported file format ", file_format_);
+    }
+    std::shared_ptr<arrow::Schema> schema;
+    ARROW_ASSIGN_OR_RAISE(schema, file_format->Inspect(source));
+    return schema;
+  }
+
+ private:
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  std::shared_ptr<RadosConnCtx> ctx_;
+  std::string file_format_;
+};
+
+arrow::Result<std::shared_ptr<SkyhookFileFormat>> SkyhookFileFormat::Make(
+    std::shared_ptr<RadosConnCtx> ctx, std::string file_format) {
+  auto format = std::make_shared<SkyhookFileFormat>(std::move(ctx), 
file_format);
+  RETURN_NOT_OK(format->Init());
+  return format;
+}
+
+SkyhookFileFormat::SkyhookFileFormat(std::shared_ptr<RadosConnCtx> ctx,
+                                     std::string file_format)
+    : impl_(new Impl(std::move(ctx), file_format)) {}
+
+SkyhookFileFormat::~SkyhookFileFormat() {}

Review comment:
       ```suggestion
   SkyhookFileFormat::~SkyhookFileFormat() = default;
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+
+#include <memory>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<std::shared_ptr<ceph::bufferlist>>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative 
position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position,
+                                                       int64_t nbytes) 
override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      std::shared_ptr<ceph::bufferlist> bl = 
std::make_shared<ceph::bufferlist>();
+      cls_cxx_read(hctx_, position, nbytes, bl.get());
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), 
bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an 
output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() override {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const override {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Mark the file as closed.
+  arrow::Status Close() override {
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<std::shared_ptr<ceph::bufferlist>> chunks_;
+};
+
+/// \brief Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to 
customize the
+/// scan.
+/// \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> 
fragment_scan_options) {

Review comment:
       ```suggestion
   arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
       cls_method_context_t hctx, const skyhook::ScanRequest& req,
       const std::shared_ptr<arrow::dataset::FileFormat>& format,
       const std::shared_ptr<arrow::dataset::FragmentScanOptions>& 
fragment_scan_options) {
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"

Review comment:
       ```suggestion
   ```
   I don't think this is used

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"

Review comment:
       ```suggestion
   ```
   I don't think this is used

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.h
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <sys/stat.h>
+#include <sstream>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+
+#define SCAN_UNKNOWN_ERR_MSG "something went wrong while scanning file 
fragment"
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace skyhook {
+
+/// An enum to represent the different
+/// types of file formats that Skyhook supports.
+struct SkyhookFileType {
+  enum type { PARQUET, IPC };
+};
+
+/// A struct encapsulating all the parameters
+/// required to be serialized in the form of flatbuffers for
+/// sending to the cls.
+struct ScanRequest {
+  arrow::compute::Expression filter_expression;
+  arrow::compute::Expression partition_expression;
+  std::shared_ptr<arrow::Schema> projection_schema;
+  std::shared_ptr<arrow::Schema> dataset_schema;
+  int64_t file_size;
+  SkyhookFileType::type file_format;
+};
+
+/// Utility functions to serialize and deserialize scan requests and result 
Arrow tables.
+arrow::Status SerializeScanRequest(ScanRequest& req, ceph::bufferlist* bl);
+arrow::Status DeserializeScanRequest(ceph::bufferlist& bl, ScanRequest* req);
+arrow::Status SerializeTable(const std::shared_ptr<arrow::Table>& table,
+                             ceph::bufferlist* bl);
+arrow::Status DeserializeTable(ceph::bufferlist& bl, bool use_threads,
+                               arrow::RecordBatchVector* batches);
+
+/// Utility function to invoke a RADOS object class function on an RADOS 
object.
+arrow::Status ExecuteObjectClassFn(const std::shared_ptr<rados::RadosConn>& 
connection,
+                                   const std::string& oid, const std::string& 
fn,
+                                   ceph::bufferlist& in, ceph::bufferlist& 
out);
+
+/// An interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and it's underlying object.
+class SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(const std::shared_ptr<rados::RadosConn>& 
connection)
+      : connection_(connection) {}

Review comment:
       ```suggestion
     explicit SkyhookDirectObjectAccess(std::shared_ptr<rados::RadosConn> 
connection)
         : connection_(std::move(connection)) {}
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  EXPECT_OK_AND_ASSIGN(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
+                                         fs, s, format, options));
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  EXPECT_OK_AND_ASSIGN(auto schema, factory->Inspect(inspect_options));
+  EXPECT_OK_AND_ASSIGN(auto dataset, factory->Finish(finish_options));
+  return dataset;
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& 
uri,
+                                                            std::string* path) 
{
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  EXPECT_OK_AND_ASSIGN(auto info, fs->GetFileInfo(path));
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> 
columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+
+  if (!columns.empty()) {
+    ARROW_EXPECT_OK(scanner_builder->Project(columns));

Review comment:
       ```suggestion
       ARROW_EXPECT_OK(scanner_builder->Project(std::move(columns)));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+
+#include <memory>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<std::shared_ptr<ceph::bufferlist>>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative 
position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position,
+                                                       int64_t nbytes) 
override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      std::shared_ptr<ceph::bufferlist> bl = 
std::make_shared<ceph::bufferlist>();
+      cls_cxx_read(hctx_, position, nbytes, bl.get());
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), 
bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an 
output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() override {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const override {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Mark the file as closed.
+  arrow::Status Close() override {
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<std::shared_ptr<ceph::bufferlist>> chunks_;
+};
+
+/// \brief Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to 
customize the
+/// scan.
+/// \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> 
fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  arrow::dataset::FileSource source(file);
+  ARROW_ASSIGN_OR_RAISE(
+      auto fragment, format->MakeFragment(std::move(source), 
req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(
+      req.dataset_schema, std::move(fragment), std::move(options));
+
+  ARROW_RETURN_NOT_OK(builder->Filter(req.filter_expression));
+  ARROW_RETURN_NOT_OK(builder->Project(req.projection_schema->field_names()));
+  ARROW_RETURN_NOT_OK(builder->UseThreads(true));
+  ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));
+
+  ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  return table;
+}
+
+/// \brief Scan RADOS objects containing Arrow IPC data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \return Table.
+static arrow::Result<std::shared_ptr<arrow::Table>> ScanIpcObject(
+    cls_method_context_t hctx, skyhook::ScanRequest req) {
+  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+  auto fragment_scan_options = 
std::make_shared<arrow::dataset::IpcFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(auto result_table, DoScan(hctx, req, std::move(format),
+                                                  
std::move(fragment_scan_options)));
+  return result_table;
+}
+
+/// \brief Scan RADOS objects containing Parquet binary data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \return Table.
+static arrow::Result<std::shared_ptr<arrow::Table>> ScanParquetObject(
+    cls_method_context_t hctx, skyhook::ScanRequest req) {
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto fragment_scan_options =
+      std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(auto result_table, DoScan(hctx, req, std::move(format),
+                                                  
std::move(fragment_scan_options)));
+  return result_table;
+}
+
+/// \brief The scan operation to execute on the Ceph OSD nodes. The scan 
request is
+/// deserialized, the object is scanned, and the resulting table is serialized
+/// and sent back to the client.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] in A bufferlist containing serialized Scan request.
+/// \param[out] out A bufferlist to store the serialized resultant table.
+/// \return Exit code.
+static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in,
+                   ceph::bufferlist* out) {
+  // Components required to construct a File fragment.
+  arrow::Status s;
+  skyhook::ScanRequest* req = new skyhook::ScanRequest();
+
+  // Deserialize the scan request.
+  if (!(s = skyhook::DeserializeScanRequest(*in, req)).ok()) {
+    LogSkyhookError(s.message());
+    return SCAN_REQ_DESER_ERR_CODE;
+  }
+
+  // Scan the object.
+  std::shared_ptr<arrow::Table> table;
+  arrow::Result<std::shared_ptr<arrow::Table>> maybe_table;
+  switch (req->file_format) {
+    case skyhook::SkyhookFileType::type::PARQUET:
+      maybe_table = ScanParquetObject(hctx, *req);
+      if (!maybe_table.ok()) {
+        LogSkyhookError("Could not scan parquet object: " +
+                        maybe_table.status().ToString());
+        return SCAN_ERR_CODE;
+      }
+      table = *maybe_table;
+      break;
+    case skyhook::SkyhookFileType::type::IPC:
+      maybe_table = ScanIpcObject(hctx, *req);
+      if (!maybe_table.ok()) {
+        LogSkyhookError("Could not scan IPC object: " + 
maybe_table.status().ToString());
+        return SCAN_ERR_CODE;
+      }
+      table = *maybe_table;
+      break;
+    default:
+      table = nullptr;
+  }
+  if (!table) {
+    LogSkyhookError("Unsupported file format");
+    return SCAN_ERR_CODE;
+  }
+
+  // Serialize the resultant table to send back to the client.
+  ceph::bufferlist* bl = new ceph::bufferlist();
+  if (!(s = skyhook::SerializeTable(table, bl)).ok()) {
+    LogSkyhookError(s.message());
+    return SCAN_RES_SER_ERR_CODE;
+  }
+
+  *out = *bl;

Review comment:
       ```suggestion
     ceph::bufferlist bl;
     if (!(s = skyhook::SerializeTable(table, &bl)).ok()) {
       LogSkyhookError(s.message());
       return SCAN_RES_SER_ERR_CODE;
     }
   
     *out = std::move(bl);
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  EXPECT_OK_AND_ASSIGN(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
+                                         fs, s, format, options));
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  EXPECT_OK_AND_ASSIGN(auto schema, factory->Inspect(inspect_options));
+  EXPECT_OK_AND_ASSIGN(auto dataset, factory->Finish(finish_options));
+  return dataset;
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& 
uri,
+                                                            std::string* path) 
{
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  EXPECT_OK_AND_ASSIGN(auto info, fs->GetFileInfo(path));
+  return GetDatasetFromDirectory(fs, format, path);

Review comment:
       ```suggestion
     return GetDatasetFromDirectory(std::move(fs), std::move(format), 
std::move(path));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+
+#include <memory>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<std::shared_ptr<ceph::bufferlist>>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative 
position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position,
+                                                       int64_t nbytes) 
override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      std::shared_ptr<ceph::bufferlist> bl = 
std::make_shared<ceph::bufferlist>();
+      cls_cxx_read(hctx_, position, nbytes, bl.get());
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), 
bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an 
output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() override {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) override {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const override {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Mark the file as closed.
+  arrow::Status Close() override {
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<std::shared_ptr<ceph::bufferlist>> chunks_;
+};
+
+/// \brief Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to 
customize the
+/// scan.
+/// \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> 
fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  arrow::dataset::FileSource source(file);
+  ARROW_ASSIGN_OR_RAISE(
+      auto fragment, format->MakeFragment(std::move(source), 
req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(
+      req.dataset_schema, std::move(fragment), std::move(options));
+
+  ARROW_RETURN_NOT_OK(builder->Filter(req.filter_expression));
+  ARROW_RETURN_NOT_OK(builder->Project(req.projection_schema->field_names()));
+  ARROW_RETURN_NOT_OK(builder->UseThreads(true));
+  ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));
+
+  ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  return table;
+}
+
+/// \brief Scan RADOS objects containing Arrow IPC data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \return Table.
+static arrow::Result<std::shared_ptr<arrow::Table>> ScanIpcObject(
+    cls_method_context_t hctx, skyhook::ScanRequest req) {
+  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+  auto fragment_scan_options = 
std::make_shared<arrow::dataset::IpcFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(auto result_table, DoScan(hctx, req, std::move(format),
+                                                  
std::move(fragment_scan_options)));
+  return result_table;
+}
+
+/// \brief Scan RADOS objects containing Parquet binary data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \return Table.
+static arrow::Result<std::shared_ptr<arrow::Table>> ScanParquetObject(
+    cls_method_context_t hctx, skyhook::ScanRequest req) {
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto fragment_scan_options =
+      std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(auto result_table, DoScan(hctx, req, std::move(format),
+                                                  
std::move(fragment_scan_options)));
+  return result_table;
+}
+
+/// \brief The scan operation to execute on the Ceph OSD nodes. The scan 
request is
+/// deserialized, the object is scanned, and the resulting table is serialized
+/// and sent back to the client.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] in A bufferlist containing serialized Scan request.
+/// \param[out] out A bufferlist to store the serialized resultant table.
+/// \return Exit code.
+static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in,
+                   ceph::bufferlist* out) {
+  // Components required to construct a File fragment.
+  arrow::Status s;
+  skyhook::ScanRequest* req = new skyhook::ScanRequest();
+
+  // Deserialize the scan request.
+  if (!(s = skyhook::DeserializeScanRequest(*in, req)).ok()) {
+    LogSkyhookError(s.message());
+    return SCAN_REQ_DESER_ERR_CODE;
+  }
+
+  // Scan the object.
+  std::shared_ptr<arrow::Table> table;
+  arrow::Result<std::shared_ptr<arrow::Table>> maybe_table;
+  switch (req->file_format) {
+    case skyhook::SkyhookFileType::type::PARQUET:
+      maybe_table = ScanParquetObject(hctx, *req);
+      if (!maybe_table.ok()) {
+        LogSkyhookError("Could not scan parquet object: " +
+                        maybe_table.status().ToString());
+        return SCAN_ERR_CODE;
+      }
+      table = *maybe_table;
+      break;
+    case skyhook::SkyhookFileType::type::IPC:
+      maybe_table = ScanIpcObject(hctx, *req);

Review comment:
       ```suggestion
     skyhook::ScanRequest req;
   
     // Deserialize the scan request.
     if (!(s = skyhook::DeserializeScanRequest(*in, &req)).ok()) {
       LogSkyhookError(s.message());
       return SCAN_REQ_DESER_ERR_CODE;
     }
   
     // Scan the object.
     std::shared_ptr<arrow::Table> table;
     arrow::Result<std::shared_ptr<arrow::Table>> maybe_table;
     switch (req.file_format) {
       case skyhook::SkyhookFileType::type::PARQUET:
         maybe_table = ScanParquetObject(hctx, std::move(req));
         if (!maybe_table.ok()) {
           LogSkyhookError("Could not scan parquet object: " +
                           maybe_table.status().ToString());
           return SCAN_ERR_CODE;
         }
         table = *maybe_table;
         break;
       case skyhook::SkyhookFileType::type::IPC:
         maybe_table = ScanIpcObject(hctx, std::move(req));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  EXPECT_OK_AND_ASSIGN(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
+                                         fs, s, format, options));
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  EXPECT_OK_AND_ASSIGN(auto schema, factory->Inspect(inspect_options));
+  EXPECT_OK_AND_ASSIGN(auto dataset, factory->Finish(finish_options));
+  return dataset;
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& 
uri,
+                                                            std::string* path) 
{
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  EXPECT_OK_AND_ASSIGN(auto info, fs->GetFileInfo(path));
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> 
columns,

Review comment:
       ```suggestion
       const std::shared_ptr<arrow::dataset::Dataset>& dataset, 
std::vector<std::string> columns,
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, SerDeserScanRequest) {
+  ceph::bufferlist* bl = new ceph::bufferlist();
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  ASSERT_OK(skyhook::SerializeScanRequest(req, bl));
+
+  skyhook::ScanRequest* req_ = new skyhook::ScanRequest();
+  ASSERT_OK(skyhook::DeserializeScanRequest(*bl, req_));

Review comment:
       ```suggestion
     ceph::bufferlist bl;
     skyhook::ScanRequest req;
     req.filter_expression = arrow::compute::literal(true);
     req.partition_expression = arrow::compute::literal(false);
     req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
     req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
     req.file_size = 1000000;
     req.file_format = skyhook::SkyhookFileType::type::IPC;
     ASSERT_OK(skyhook::SerializeScanRequest(req, &bl));
   
     skyhook::ScanRequest req_;
     ASSERT_OK(skyhook::DeserializeScanRequest(bl, &req_));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  // The constants below should match the parameters with
+  // which the Ceph cluster is configured in integration_skyhook.sh.
+  // Currently, all the default values have been used.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, 
ceph_cluster_name,
+                                              ceph_cls_name);
+  EXPECT_OK_AND_ASSIGN(auto format,
+                       skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  EXPECT_OK_AND_ASSIGN(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
+                                         fs, s, format, options));
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  EXPECT_OK_AND_ASSIGN(auto schema, factory->Inspect(inspect_options));
+  EXPECT_OK_AND_ASSIGN(auto dataset, factory->Finish(finish_options));
+  return dataset;
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& 
uri,
+                                                            std::string* path) 
{
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  EXPECT_OK_AND_ASSIGN(auto info, fs->GetFileInfo(path));
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> 
columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+
+  if (!columns.empty()) {
+    ARROW_EXPECT_OK(scanner_builder->Project(columns));
+  }
+
+  ARROW_EXPECT_OK(scanner_builder->Filter(filter));

Review comment:
       ```suggestion
     ARROW_EXPECT_OK(scanner_builder->Filter(std::move(filter)));
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, SerDeserScanRequest) {
+  ceph::bufferlist* bl = new ceph::bufferlist();
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  ASSERT_OK(skyhook::SerializeScanRequest(req, bl));
+
+  skyhook::ScanRequest* req_ = new skyhook::ScanRequest();
+  ASSERT_OK(skyhook::DeserializeScanRequest(*bl, req_));
+  ASSERT_TRUE(req.filter_expression.Equals(req_->filter_expression));
+  ASSERT_TRUE(req.partition_expression.Equals(req_->partition_expression));
+  ASSERT_TRUE(req.projection_schema->Equals(req_->projection_schema));
+  ASSERT_TRUE(req.dataset_schema->Equals(req_->dataset_schema));
+  ASSERT_EQ(req.file_size, req_->file_size);
+  ASSERT_EQ(req.file_format, req_->file_format);
+}
+
+TEST(TestSkyhookProtocol, SerDeserTable) {
+  std::shared_ptr<arrow::Table> table = CreateTable();
+  ceph::bufferlist* bl = new ceph::bufferlist();
+  ASSERT_OK(skyhook::SerializeTable(table, bl));
+
+  arrow::RecordBatchVector* batches = new arrow::RecordBatchVector();
+  ASSERT_OK(skyhook::DeserializeTable(*bl, false, batches));
+  ASSERT_OK_AND_ASSIGN(auto materialized_table,
+                       arrow::Table::FromRecordBatches(*batches));
+
+  ASSERT_TRUE(table->Equals(*materialized_table));

Review comment:
       ```suggestion
     std::shared_ptr<arrow::Table> table = CreateTable();
     ceph::bufferlist bl;
     ASSERT_OK(skyhook::SerializeTable(table, &bl));
   
     arrow::RecordBatchVector batches;
     ASSERT_OK(skyhook::DeserializeTable(bl, false, &batches));
     ASSERT_OK_AND_ASSIGN(auto materialized_table,
                          arrow::Table::FromRecordBatches(batches));
   
     ASSERT_TRUE(table->Equals(*materialized_table));
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.h
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <sys/stat.h>
+#include <sstream>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+
+#define SCAN_UNKNOWN_ERR_MSG "something went wrong while scanning file 
fragment"
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace skyhook {
+
+/// An enum to represent the different
+/// types of file formats that Skyhook supports.
+struct SkyhookFileType {
+  enum type { PARQUET, IPC };
+};
+
+/// A struct encapsulating all the parameters
+/// required to be serialized in the form of flatbuffers for
+/// sending to the cls.
+struct ScanRequest {
+  arrow::compute::Expression filter_expression;
+  arrow::compute::Expression partition_expression;
+  std::shared_ptr<arrow::Schema> projection_schema;
+  std::shared_ptr<arrow::Schema> dataset_schema;
+  int64_t file_size;
+  SkyhookFileType::type file_format;
+};
+
+/// Utility functions to serialize and deserialize scan requests and result 
Arrow tables.
+arrow::Status SerializeScanRequest(ScanRequest& req, ceph::bufferlist* bl);
+arrow::Status DeserializeScanRequest(ceph::bufferlist& bl, ScanRequest* req);
+arrow::Status SerializeTable(const std::shared_ptr<arrow::Table>& table,
+                             ceph::bufferlist* bl);
+arrow::Status DeserializeTable(ceph::bufferlist& bl, bool use_threads,
+                               arrow::RecordBatchVector* batches);
+
+/// Utility function to invoke a RADOS object class function on an RADOS 
object.
+arrow::Status ExecuteObjectClassFn(const std::shared_ptr<rados::RadosConn>& 
connection,
+                                   const std::string& oid, const std::string& 
fn,
+                                   ceph::bufferlist& in, ceph::bufferlist& 
out);
+
+/// An interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and it's underlying object.
+class SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(const std::shared_ptr<rados::RadosConn>& 
connection)
+      : connection_(connection) {}
+
+  ~SkyhookDirectObjectAccess() {}

Review comment:
       ```suggestion
     ~SkyhookDirectObjectAccess() = default;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to