lidavidm commented on a change in pull request #10431:
URL: https://github.com/apache/arrow/pull/10431#discussion_r682627932



##########
File path: cpp/src/arrow/dataset/discovery.cc
##########
@@ -270,7 +270,14 @@ Result<std::shared_ptr<Dataset>> 
FileSystemDatasetFactory::Finish(FinishOptions
   for (const auto& info : files_) {
     auto fixed_path = StripPrefixAndFilename(info.path(), 
options_.partition_base_dir);
     ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
-    ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, 
partition));
+    std::shared_ptr<FileFragment> fragment;
+    if (format_->type_name() == "skyhook") {
+      ARROW_ASSIGN_OR_RAISE(fragment,
+                            format_->MakeFragment({info, fs_}, partition, 
true, schema));

Review comment:
       Additionally just a nit here, generally for constant arguments it's best 
to add an inline comment to remind the reader of the parameter name, e.g. 
`MakeFragment(..., /*is_dataset_schema=*/true, schema)`. See 
https://google.github.io/styleguide/cppguide.html#Function_Argument_Comments

##########
File path: cpp/src/arrow/dataset/file_skyhook.cc
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "arrow/dataset/file_skyhook.h"
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+
+#include <flatbuffers/flatbuffers.h>

Review comment:
       nit: third party headers should come first
   
   https://google.github.io/styleguide/cppguide.html#Names_and_Order_of_Includes

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -32,6 +32,10 @@
 #include "arrow/dataset/dataset.h"
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/dataset/scanner_internal.h"
+#include "arrow/io/memory.h"

Review comment:
       nit: are these includes necessary here?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -104,6 +104,8 @@ struct ARROW_DS_EXPORT ScanOptions {
   ///
   /// Note: The IOContext executor will be ignored if use_threads is set to 
false
   io::IOContext io_context;
+  // Partition expression
+  compute::Expression partition_expression = compute::literal(true);

Review comment:
       Instead of stuffing it into the scan options, it might be better to 
thread this through from SkyhookFileFormat into SkyhookScanTask and then into 
the serialized scan request.

##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+namespace connection {
+/// \brief An interface for general connections.
+class ARROW_DS_EXPORT Connection {
+ public:
+  virtual Status Connect() = 0;
+
+  Connection() = default;
+  virtual ~Connection() = default;
+};
+
+/// \class RadosConnection
+/// \brief An interface to connect to a Rados cluster and hold the connection
+/// information for usage in later stages.
+class ARROW_DS_EXPORT RadosConnection : public Connection {
+ public:
+  struct RadosConnectionCtx {
+    std::string ceph_config_path;
+    std::string data_pool;
+    std::string user_name;
+    std::string cluster_name;
+    std::string cls_name;
+
+    RadosConnectionCtx(const std::string& ceph_config_path, const std::string& 
data_pool,
+                       const std::string& user_name, const std::string& 
cluster_name,
+                       const std::string& cls_name)
+        : ceph_config_path(ceph_config_path),
+          data_pool(data_pool),
+          user_name(user_name),
+          cluster_name(cluster_name),
+          cls_name(cls_name) {}
+  };
+  explicit RadosConnection(const RadosConnectionCtx& ctx)
+      : Connection(),
+        ctx(ctx),
+        rados(new RadosWrapper()),
+        ioCtx(new IoCtxWrapper()),
+        connected(false) {}
+
+  ~RadosConnection();
+
+  /// \brief Connect to the Rados cluster.
+  /// \return Status.
+  Status Connect();
+
+  /// \brief Shutdown the connection to the Rados cluster.
+  /// \return Status.
+  Status Shutdown();
+
+  RadosConnectionCtx ctx;
+  RadosInterface* rados;
+  IoCtxInterface* ioCtx;
+  bool connected;
+};
+
+}  // namespace connection
+
+/// \class SkyhookDirectObjectAccess
+/// \brief Interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and its underlying object.
+class ARROW_DS_EXPORT SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(
+      const std::shared_ptr<connection::RadosConnection>& connection)
+      : connection_(std::move(connection)) {}
+
+  /// \brief Executes the POSIX stat call on a file.
+  /// \param[in] path Path of the file.
+  /// \param[out] st Refernce to the struct object to store the result.
+  /// \return Status.
+  Status Stat(const std::string& path, struct stat& st) {
+    struct stat file_st;
+    if (stat(path.c_str(), &file_st) < 0)
+      return Status::Invalid("stat returned non-zero exit code.");
+    st = file_st;
+    return Status::OK();
+  }
+
+  // Helper function to convert Inode to ObjectID because Rados calls work with
+  // ObjectIDs.
+  std::string ConvertFileInodeToObjectID(uint64_t inode) {
+    std::stringstream ss;
+    ss << std::hex << inode;
+    std::string oid(ss.str() + ".00000000");
+    return oid;
+  }
+
+  /// \brief Executes query on the librados node. It uses the librados::exec 
API to
+  /// perform queries on the storage node and stores the result in the output 
bufferlist.
+  /// \param[in] inode inode of the file.
+  /// \param[in] fn The function to be executed by the librados::exec call.
+  /// \param[in] in The input bufferlist.
+  /// \param[out] out The output bufferlist.
+  /// \return Status.
+  Status Exec(uint64_t inode, const std::string& fn, ceph::bufferlist& in,
+              ceph::bufferlist& out) {
+    std::string oid = ConvertFileInodeToObjectID(inode);
+    int e = connection_->ioCtx->exec(oid.c_str(), 
connection_->ctx.cls_name.c_str(),
+                                     fn.c_str(), in, out);
+    if (e == SCAN_ERR_CODE) return Status::Invalid(SCAN_ERR_MSG);
+    if (e == SCAN_REQ_DESER_ERR_CODE) return 
Status::Invalid(SCAN_REQ_DESER_ERR_MSG);
+    if (e == SCAN_RES_SER_ERR_CODE) return 
Status::Invalid(SCAN_RES_SER_ERR_MSG);
+    return Status::OK();
+  }
+
+ protected:
+  std::shared_ptr<connection::RadosConnection> connection_;
+};
+
+/// \class SkyhookFileFormat
+/// \brief A ParquetFileFormat implementation that offloads the fragment
+/// scan operations to the Ceph OSDs
+class ARROW_DS_EXPORT SkyhookFileFormat : public ParquetFileFormat {
+ public:
+  SkyhookFileFormat(const std::string& format, const std::string& 
ceph_config_path,
+                    const std::string& data_pool, const std::string& user_name,
+                    const std::string& cluster_name, const std::string& 
cls_name);
+
+  explicit SkyhookFileFormat(const 
std::shared_ptr<connection::RadosConnection>& conn);
+
+  explicit SkyhookFileFormat(std::shared_ptr<SkyhookDirectObjectAccess>& doa)
+      : doa_(std::move(doa)) {}
+
+  std::string type_name() const override { return "skyhook"; }
+
+  bool splittable() const { return true; }
+
+  bool Equals(const FileFormat& other) const override {
+    return type_name() == other.type_name();
+  }
+
+  Result<bool> IsSupported(const FileSource& source) const override { return 
true; }
+
+  /// \brief Return the schema of the file fragment.
+  /// \param[in] source The source of the file fragment.
+  /// \return The schema of the file fragment.
+  Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const 
override;
+
+  /// \brief Scan a file fragment.
+  /// \param[in] options Options to pass.
+  /// \param[in] file The file fragment.
+  /// \return The scanned file fragment.
+  Result<ScanTaskIterator> ScanFile(
+      const std::shared_ptr<ScanOptions>& options,
+      const std::shared_ptr<FileFragment>& file) const override;
+
+  Result<std::shared_ptr<FileWriter>> MakeWriter(
+      std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> 
schema,
+      std::shared_ptr<FileWriteOptions> options) const {
+    return Status::NotImplemented("Use the Python API");
+  }
+
+  std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return 
NULLPTR; }
+
+ protected:
+  std::shared_ptr<SkyhookDirectObjectAccess> doa_;
+  std::string fragment_format_;
+};
+
+/// \brief Serialize scan request to a bufferlist.
+/// \param[in] options The scan options to use to build a ScanRequest.
+/// \param[in] file_format The underlying file format to use.
+/// \param[in] file_size The size of the file fragment.
+/// \param[out] bl Output bufferlist.
+/// \return Status.
+ARROW_DS_EXPORT Status SerializeScanRequest(std::shared_ptr<ScanOptions>& 
options,
+                                            int& file_format, int64_t& 
file_size,
+                                            ceph::bufferlist& bl);
+
+/// \brief Deserialize scan request from bufferlist.
+/// \param[out] filter The filter expression to apply.
+/// \param[out] partition The partition expression to use.
+/// \param[out] projected_schema The schema to project the filtered record 
batches.
+/// \param[out] dataset_schema The dataset schema to use.
+/// \param[out] file_size The size of the file fragment.
+/// \param[out] file_format The underlying file format to use.
+/// \param[in] bl Input Ceph bufferlist.
+/// \return Status.
+ARROW_DS_EXPORT Status DeserializeScanRequest(compute::Expression* filter,
+                                              compute::Expression* partition,
+                                              std::shared_ptr<Schema>* 
projected_schema,
+                                              std::shared_ptr<Schema>* 
dataset_schema,
+                                              int64_t& file_size, int& 
file_format,
+                                              ceph::bufferlist& bl);
+
+/// \brief Serialize the result Table to a bufferlist.
+/// \param[in] table The table to serialize.
+/// \param[in] aggressive If true, use ZSTD compression instead of LZ4.

Review comment:
       Is there a reason to hardcode ZSTD/LZ4 instead of accepting an Arrow 
codec instance or codec type?

##########
File path: cpp/src/arrow/dataset/file_skyhook_test.cc
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_skyhook.h"
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/test_util.h"
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+namespace arrow {
+namespace dataset {
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema =

Review comment:
       You might find helpers like TableFromJSON useful: 
https://github.com/apache/arrow/blob/e4ba2f28f79fd5bcc4bf466c4b0ee75a0bf2c375/cpp/src/arrow/testing/gtest_util.h#L321-L323

##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+namespace connection {

Review comment:
       I would expect these to go into something like `arrow::rados` (and in a 
different path) in analogy with `arrow::s3` or `arrow::hdfs` instead of being 
inline here.

##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };

Review comment:
       enum class? Or else Arrow uses a pattern where we have
   
   ```cpp
   struct SkyhookFileType {
     enum type {
       PARQUET,
       IPC
     };
   };
   ```

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -185,6 +185,12 @@ inline Result<ScanTaskIterator> GetScanTaskIterator(
   auto fn = [options](std::shared_ptr<Fragment> fragment) -> 
Result<ScanTaskIterator> {
     ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options));
 
+    if (fragment->type_name() == "skyhook") {

Review comment:
       Or rather, we shouldn't skip filter and projection here right? Even if 
it's pushed down (as can be done already in Parquet) we should keep the rest of 
the pipeline uniform. (At least, projection should be cheap - it may be 
understandable to skip filtering entirely if the format guarantees that 
filtering is completely pushed down.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to