lidavidm commented on a change in pull request #10913:
URL: https://github.com/apache/arrow/pull/10913#discussion_r704542364



##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.h
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <sys/stat.h>
+#include <sstream>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+
+#define SCAN_UNKNOWN_ERR_MSG "something went wrong while scanning file 
fragment"
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace skyhook {
+
+/// An enum to represent the different
+/// types of file formats that Skyhook supports.
+struct SkyhookFileType {
+  enum type { PARQUET, IPC };
+};
+
+/// A struct encapsulating all the parameters
+/// required to be serialized in the form of flatbuffers for
+/// sending to the cls.
+struct ScanRequest {
+  arrow::compute::Expression filter_expression;
+  arrow::compute::Expression partition_expression;
+  std::shared_ptr<arrow::Schema> projection_schema;
+  std::shared_ptr<arrow::Schema> dataset_schema;
+  int64_t file_size;
+  SkyhookFileType::type file_format;
+};
+
+/// Utility functions to serialize and deserialize scan requests and result 
Arrow tables.
+arrow::Status SerializeScanRequest(ScanRequest req, ceph::bufferlist& bl);
+arrow::Status DeserializeScanRequest(ScanRequest& req, ceph::bufferlist bl);
+arrow::Status SerializeTable(const std::shared_ptr<arrow::Table>& table,
+                             ceph::bufferlist& bl);
+arrow::Status DeserializeTable(arrow::RecordBatchVector& batches, 
ceph::bufferlist bl,
+                               bool use_threads);

Review comment:
       Note the Arrow/Google style guide recommends pointers for out parameters 
(so it would be `ceph::bufferlist*`, `ScanRequest*`, etc. in Serialize) and 
recommends they generally be put at the end.

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.cc
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include <flatbuffers/flatbuffers.h>
+
+#include "ScanRequest_generated.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/result.h"
+#include "arrow/util/io_util.h"
+
+namespace skyhook {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+arrow::Status SerializeScanRequest(ScanRequest req, ceph::bufferlist& bl) {
+  ARROW_ASSIGN_OR_RAISE(auto filter_expression,
+                        arrow::compute::Serialize(req.filter_expression));
+  ARROW_ASSIGN_OR_RAISE(auto partition_expression,
+                        arrow::compute::Serialize(req.partition_expression));
+  ARROW_ASSIGN_OR_RAISE(auto projection_schema,
+                        arrow::ipc::SerializeSchema(*req.projection_schema));
+  ARROW_ASSIGN_OR_RAISE(auto dataset_schema,
+                        arrow::ipc::SerializeSchema(*req.dataset_schema));
+
+  flatbuffers::FlatBufferBuilder builder(1024);
+  auto filter_expression_vector =
+      builder.CreateVector(filter_expression->data(), 
filter_expression->size());
+  auto partition_expression_vector =
+      builder.CreateVector(partition_expression->data(), 
partition_expression->size());
+  auto projected_schema_vector =
+      builder.CreateVector(projection_schema->data(), 
projection_schema->size());
+  auto dataset_schema_vector =
+      builder.CreateVector(dataset_schema->data(), dataset_schema->size());
+
+  auto request = flatbuf::CreateScanRequest(
+      builder, req.file_size, static_cast<int>(req.file_format), 
filter_expression_vector,
+      partition_expression_vector, dataset_schema_vector, 
projected_schema_vector);
+  builder.Finish(request);
+  uint8_t* buf = builder.GetBufferPointer();
+  int size = builder.GetSize();
+
+  bl.append((char*)buf, size);
+  return arrow::Status::OK();
+}
+
+arrow::Status DeserializeScanRequest(ScanRequest& req, ceph::bufferlist bl) {
+  auto request = flatbuf::GetScanRequest((uint8_t*)bl.c_str());
+
+  ARROW_ASSIGN_OR_RAISE(auto filter_expression,
+                        
arrow::compute::Deserialize(std::make_shared<arrow::Buffer>(
+                            request->filter()->data(), 
request->filter()->size())));
+  req.filter_expression = filter_expression;
+
+  ARROW_ASSIGN_OR_RAISE(auto partition_expression,
+                        
arrow::compute::Deserialize(std::make_shared<arrow::Buffer>(
+                            request->partition()->data(), 
request->partition()->size())));
+  req.partition_expression = partition_expression;
+
+  arrow::ipc::DictionaryMemo empty_memo;
+  arrow::io::BufferReader 
projection_schema_reader(request->projection_schema()->data(),
+                                                   
request->projection_schema()->size());
+  arrow::io::BufferReader 
dataset_schema_reader(request->dataset_schema()->data(),
+                                                
request->dataset_schema()->size());
+
+  ARROW_ASSIGN_OR_RAISE(req.projection_schema,
+                        arrow::ipc::ReadSchema(&projection_schema_reader, 
&empty_memo));
+  ARROW_ASSIGN_OR_RAISE(req.dataset_schema,
+                        arrow::ipc::ReadSchema(&dataset_schema_reader, 
&empty_memo));
+
+  req.file_size = request->file_size();
+  req.file_format = (SkyhookFileType::type)request->file_format();
+  return arrow::Status::OK();
+}
+
+arrow::Status SerializeTable(const std::shared_ptr<arrow::Table>& table,
+                             ceph::bufferlist& bl) {
+  ARROW_ASSIGN_OR_RAISE(auto buffer_output_stream,
+                        arrow::io::BufferOutputStream::Create());
+
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+  auto codec = arrow::Compression::LZ4_FRAME;
+
+  ARROW_ASSIGN_OR_RAISE(options.codec, arrow::util::Codec::Create(codec));
+  ARROW_ASSIGN_OR_RAISE(auto writer, arrow::ipc::MakeStreamWriter(
+                                         buffer_output_stream, 
table->schema(), options));
+
+  ARROW_RETURN_NOT_OK(writer->WriteTable(*table));
+  ARROW_RETURN_NOT_OK(writer->Close());
+
+  ARROW_ASSIGN_OR_RAISE(auto buffer, buffer_output_stream->Finish());
+  bl.append((char*)buffer->data(), buffer->size());

Review comment:
       nit: use reinterpret_cast instead of C-style cast?

##########
File path: ci/scripts/integration_skyhook.sh
##########
@@ -0,0 +1,134 @@
+#!/usr/bin/env bash

Review comment:
       Can we note that fact in the file here in case we need to update or 
debug this script in the future?

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {

Review comment:
       I realize CheckClosed is private, but what about ReadAt, etc.?

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, SerDeserScanRequest) {
+  ceph::bufferlist bl;
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  ASSERT_OK(skyhook::SerializeScanRequest(req, bl));
+
+  skyhook::ScanRequest req_;
+  ASSERT_OK(skyhook::DeserializeScanRequest(req_, bl));
+  ASSERT_EQ(req.filter_expression.Equals(req_.filter_expression), 1);

Review comment:
       You should be able to `ASSERT_EQ(req.filter_expression, 
req_.filter_expression)` or if not that, at least just 
`ASSERT_TRUE(expr1.Equals(expr2))`.

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", 
msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative 
position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, 
int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), 
bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an 
output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;

Review comment:
       At the very least, we should use unique_ptr here instead of new/delete; 
and that way, we don't have to manually delete in Close(), and can get rid of 
the odd destructor.

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -90,6 +90,8 @@ class ARROW_DS_EXPORT Fragment : public 
std::enable_shared_from_this<Fragment> {
 
   virtual ~Fragment() = default;
 
+  bool handles_compute = true;

Review comment:
       This is still unclear to me without having to go look at the 
implementation - does `true` mean this fragment applies filters itself, or no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to