[
https://issues.apache.org/jira/browse/ARROW-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292565#comment-16292565
]
ASF GitHub Bot commented on ARROW-1920:
---------------------------------------
xhochy commented on a change in pull request #1418: ARROW-1920 [C++/Python] Add
ORC Reader
URL: https://github.com/apache/arrow/pull/1418#discussion_r157206300
##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -0,0 +1,698 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "arrow/adapters/orc/adapter.h"
+#include "arrow/buffer.h"
+#include "arrow/builder.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/table_builder.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+#include "orc/OrcFile.hh"
+
+// alias to not interfere with nested orc namespace
+namespace liborc = orc;
+
+namespace arrow {
+namespace adapters {
+namespace orc {
+
+#define ORC_THROW_NOT_OK(s) \
+ do { \
+ Status _s = (s); \
+ if (!_s.ok()) { \
+ std::stringstream ss; \
+ ss << "Arrow error: " << _s.ToString(); \
+ throw liborc::ParseError(ss.str()); \
+ } \
+ } while (0)
+
+class ArrowInputFile : public liborc::InputStream {
+ public:
+ explicit ArrowInputFile(const std::shared_ptr<io::ReadableFileInterface>&
file)
+ : file_(file) {}
+
+ uint64_t getLength() const override {
+ int64_t size;
+ ORC_THROW_NOT_OK(file_->GetSize(&size));
+ return static_cast<uint64_t>(size);
+ }
+
+ uint64_t getNaturalReadSize() const override { return 128 * 1024; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ int64_t bytes_read;
+
+ ORC_THROW_NOT_OK(file_->ReadAt(offset, length, &bytes_read, buf));
+
+ if (static_cast<uint64_t>(bytes_read) != length) {
+ throw liborc::ParseError("Short read from arrow input file");
+ }
+ }
+
+ const std::string& getName() const override {
+ static const std::string filename("ArrowInputFile");
+ return filename;
+ }
+
+ private:
+ std::shared_ptr<io::ReadableFileInterface> file_;
+};
+
+struct StripeInformation {
+ uint64_t offset;
+ uint64_t length;
+ uint64_t num_rows;
+};
+
+Status get_dtype(const liborc::Type* type, std::shared_ptr<DataType>* out) {
+ if (type == nullptr) {
+ *out = null();
+ return Status::OK();
+ }
+ liborc::TypeKind kind = type->getKind();
+ switch (kind) {
+ case liborc::BOOLEAN:
+ *out = boolean();
+ break;
+ case liborc::BYTE:
+ *out = int8();
+ break;
+ case liborc::SHORT:
+ *out = int16();
+ break;
+ case liborc::INT:
+ *out = int32();
+ break;
+ case liborc::LONG:
+ *out = int64();
+ break;
+ case liborc::FLOAT:
+ *out = float32();
+ break;
+ case liborc::DOUBLE:
+ *out = float64();
+ break;
+ case liborc::VARCHAR:
+ case liborc::STRING:
+ *out = utf8();
+ break;
+ case liborc::BINARY:
+ *out = binary();
+ break;
+ case liborc::CHAR:
+ *out = fixed_size_binary(type->getMaximumLength());
+ break;
+ case liborc::TIMESTAMP:
+ *out = timestamp(TimeUnit::NANO);
+ break;
+ case liborc::DATE:
+ *out = date64();
+ break;
+ case liborc::DECIMAL: {
+ if (type->getPrecision() == 0) {
+ // In HIVE 0.11/0.12 precision is set as 0, but means max precision
+ *out = decimal(38, 6);
+ } else {
+ *out = decimal(type->getPrecision(), type->getScale());
+ }
+ break;
+ }
+ case liborc::LIST: {
+ if (type->getSubtypeCount() != 1) {
+ return Status::Invalid("Invalid Orc List type");
+ }
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(get_dtype(type->getSubtype(0), &elemtype));
+ *out = list(elemtype);
+ break;
+ }
+ case liborc::MAP: {
+ if (type->getSubtypeCount() != 2) {
+ return Status::Invalid("Invalid Orc Map type");
+ }
+ std::shared_ptr<DataType> keytype;
+ std::shared_ptr<DataType> valtype;
+ RETURN_NOT_OK(get_dtype(type->getSubtype(0), &keytype));
+ RETURN_NOT_OK(get_dtype(type->getSubtype(1), &valtype));
+ auto fields = {field("key", keytype), field("value", valtype)};
+ *out = list(struct_(fields));
+ break;
+ }
+ case liborc::STRUCT: {
+ auto size = type->getSubtypeCount();
+ auto fields = std::vector<std::shared_ptr<Field>>();
+ for (uint64_t child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(get_dtype(type->getSubtype(child), &elemtype));
+ std::string name = type->getFieldName(child);
+ fields.push_back(field(name, elemtype));
+ }
+ *out = struct_(fields);
+ break;
+ }
+ case liborc::UNION: {
+ auto size = type->getSubtypeCount();
+ auto fields = std::vector<std::shared_ptr<Field>>();
+ std::vector<uint8_t> type_codes;
+ for (uint64_t child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(get_dtype(type->getSubtype(child), &elemtype));
+ auto f = field("_union_" + std::to_string(child), elemtype);
+ fields.push_back(f);
+ type_codes.push_back((uint8_t)child);
+ }
+ *out = union_(fields, type_codes);
+ break;
+ }
+ default: {
+ std::stringstream ss;
+ ss << "Unknown Orc type kind: " << kind;
+ return Status::Invalid(ss.str());
+ }
+ }
+ return Status::OK();
+}
+
+class ORCFileReader::Impl {
+ public:
+ explicit Impl(MemoryPool* pool, std::unique_ptr<liborc::Reader> reader)
+ : pool_(pool), reader_(std::move(reader)) {}
+
+ static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+ MemoryPool* pool, std::unique_ptr<Impl>* impl) {
+ std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file));
+ liborc::ReaderOptions options;
+ std::unique_ptr<liborc::Reader> liborc_reader;
+ try {
+ liborc_reader = createReader(std::move(io_wrapper), options);
+ } catch (const liborc::ParseError& e) {
+ return Status::IOError(e.what());
+ }
+ impl->reset(new Impl(pool, std::move(liborc_reader)));
+ RETURN_NOT_OK((*impl)->Init());
+
+ return Status::OK();
+ }
+
+ Status Init() {
+ int64_t nstripes = reader_->getNumberOfStripes();
+ stripes_.resize(nstripes);
+ std::unique_ptr<liborc::StripeInformation> stripe;
+ for (int i = 0; i < nstripes; ++i) {
+ stripe = reader_->getStripe(i);
+ stripes_[i] = StripeInformation(
+ {stripe->getOffset(), stripe->getLength(),
stripe->getNumberOfRows()});
+ }
+ return Status::OK();
+ }
+
+ uint64_t NumberOfStripes() { return stripes_.size(); }
+
+ uint64_t NumberOfRows() { return reader_->getNumberOfRows(); }
+
+ Status ReadSchema(std::shared_ptr<Schema>* out) {
+ const liborc::Type& type = reader_->getType();
+ return type_to_schema(type, out);
+ }
+
+ Status type_to_schema(const liborc::Type& type, std::shared_ptr<Schema>*
out) {
+ if (type.getKind() != liborc::STRUCT) {
+ return Status::NotImplemented(
+ "Only ORC files with a top-level struct "
+ "can be handled");
+ }
+ auto size = type.getSubtypeCount();
+ auto fields = std::vector<std::shared_ptr<Field>>();
+ for (uint64_t child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(get_dtype(type.getSubtype(child), &elemtype));
+ std::string name = type.getFieldName(child);
+ fields.push_back(field(name, elemtype));
+ }
+ std::list<std::string> keys = reader_->getMetadataKeys();
+ std::shared_ptr<KeyValueMetadata> metadata = nullptr;
+ if (!keys.empty()) {
+ metadata = std::make_shared<KeyValueMetadata>();
+ for (auto it = keys.begin(); it != keys.end(); ++it) {
+ metadata->Append(*it, reader_->getMetadataValue(*it));
+ }
+ }
+
+ *out = std::make_shared<Schema>(fields, metadata);
+ return Status::OK();
+ }
+
+ Status Read(std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ return read_batch(opts, NumberOfRows(), out);
+ }
+
+ Status Read(const std::list<uint64_t>& include_indices,
+ std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ opts.includeTypes(include_indices);
+ return read_batch(opts, NumberOfRows(), out);
+ }
+
+ Status ReadStripe(uint64_t stripe, std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ RETURN_NOT_OK(select_stripe(&opts, stripe));
+ return read_batch(opts, stripes_[stripe].num_rows, out);
+ }
+
+ Status ReadStripe(uint64_t stripe, const std::list<uint64_t>&
include_indices,
+ std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ RETURN_NOT_OK(select_stripe(&opts, stripe));
+ opts.includeTypes(include_indices);
+ return read_batch(opts, stripes_[stripe].num_rows, out);
+ }
+
+ Status select_stripe(liborc::RowReaderOptions* opts, uint64_t stripe) {
+ if (stripe >= stripes_.size()) {
+ std::stringstream ss;
+ ss << "Out of bounds stripe: " << stripe;
+ return Status::Invalid(ss.str());
+ }
+ opts->range(stripes_[stripe].offset, stripes_[stripe].length);
+ return Status::OK();
+ }
+
+ Status read_batch(const liborc::RowReaderOptions& opts, uint64_t nrows,
+ std::shared_ptr<RecordBatch>* out) {
+ std::unique_ptr<liborc::RowReader> rowreader;
+ std::unique_ptr<liborc::ColumnVectorBatch> batch;
+ try {
+ rowreader = reader_->createRowReader(opts);
+ batch = rowreader->createRowBatch(std::min(nrows, (uint64_t)1000));
Review comment:
It would be nice if you refactor this out as a constant declared somewhere,
e.g. `constexpr uint64_t kReadRowsBatch = 1000`. One approach we used in
parquet-cpp was to implement a specialization in the parquet-cpp code that
could directly read into pre-allocated memory. I don't expect this to be
implemented in this PR but it could be a worthwhile optimisation if this here
proves to be too slow (we had a 30% performance improvement by that change in
parquet_arrow).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add support for reading ORC files
> ---------------------------------
>
> Key: ARROW-1920
> URL: https://issues.apache.org/jira/browse/ARROW-1920
> Project: Apache Arrow
> Issue Type: New Feature
> Components: C++, Python
> Reporter: Jim Crist
> Labels: pull-request-available
>
> Would be nice to be able to read ORC files in pyarrow, similar to the already
> existing parquet support.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)