HappenLee commented on code in PR #9541: URL: https://github.com/apache/incubator-doris/pull/9541#discussion_r880007777
########## be/src/exec/arrow/arrow_reader.cpp: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "exec/arrow/arrow_reader.h" + +#include <arrow/array.h> +#include <arrow/status.h> +#include <time.h> + +#include "common/logging.h" +#include "exec/file_reader.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "util/thrift_util.h" + +namespace doris { + +// Broker + +ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) { + _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader)); + _rb_reader = nullptr; + _total_groups = 0; + _current_group = 0; +} + +ArrowReaderWrap::~ArrowReaderWrap() { + close(); +} + +void ArrowReaderWrap::close() { + arrow::Status st = _arrow_file->Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) { + _include_column_ids.clear(); + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = tuple_slot_descs.at(i); + // Get the Column Reader for the boolean column + auto iter = _map_column.find(slot_desc->col_name()); + if (iter != _map_column.end()) { + _include_column_ids.emplace_back(iter->second); + } else { + std::stringstream str_error; + str_error << "Invalid Column Name:" << slot_desc->col_name(); + LOG(WARNING) << str_error.str(); + return Status::InvalidArgument(str_error.str()); + } + } + return Status::OK(); +} + +ArrowFile::ArrowFile(FileReader* file) : _file(file) {} + +ArrowFile::~ArrowFile() { + arrow::Status st = Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +arrow::Status ArrowFile::Close() { + if (_file != nullptr) { + _file->close(); + delete _file; + _file = nullptr; + } + return arrow::Status::OK(); +} + +bool ArrowFile::closed() const { + if (_file != nullptr) { + return _file->closed(); + } else { + return true; + } +} + +arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) { + return ReadAt(_pos, nbytes, buffer); +} + +arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) { + int64_t reads = 0; + int64_t bytes_read = 0; + _pos = position; + while (nbytes > 0) { + Status result = _file->readat(_pos, nbytes, &reads, out); + if (!result.ok()) { + bytes_read = 0; Review Comment: unless code ########## be/src/exec/arrow/arrow_reader.cpp: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "exec/arrow/arrow_reader.h" + +#include <arrow/array.h> +#include <arrow/status.h> +#include <time.h> + +#include "common/logging.h" +#include "exec/file_reader.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "util/thrift_util.h" + +namespace doris { + +// Broker + +ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) { + _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader)); + _rb_reader = nullptr; + _total_groups = 0; + _current_group = 0; +} + +ArrowReaderWrap::~ArrowReaderWrap() { + close(); +} + +void ArrowReaderWrap::close() { + arrow::Status st = _arrow_file->Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) { + _include_column_ids.clear(); Review Comment: DCHECK(_num_of_columns_from_file >= tuple_slot_descs.size()); `at` operation will cause `Exception` ########## be/src/exec/arrow/parquet_reader.h: ########## @@ -55,44 +56,27 @@ class SlotDescriptor; class MemPool; class FileReader; -class ParquetFile : public arrow::io::RandomAccessFile { -public: - ParquetFile(FileReader* file); - ~ParquetFile() override; - arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override; - arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; - arrow::Result<int64_t> GetSize() override; - arrow::Status Seek(int64_t position) override; - arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; - arrow::Result<int64_t> Tell() const override; - arrow::Status Close() override; - bool closed() const override; - -private: - FileReader* _file; - int64_t _pos = 0; -}; - // Reader of broker parquet file -class ParquetReaderWrap { +class ParquetReaderWrap : public ArrowReaderWrap { public: - ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file); - virtual ~ParquetReaderWrap(); + // batch_size is not use here + ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file); + virtual ~ParquetReaderWrap() {} // Read Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, - MemPool* mem_pool, bool* eof); - void close(); - Status size(int64_t* size); - Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::string& timezone); + MemPool* mem_pool, bool* eof) override; + Status size(int64_t* size) override; + Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::string& timezone) override; Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, - const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof); + const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof) override; + void close() override; private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); - Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs); Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof); Review Comment: unless for the param of `tuple_slot_descs`, delete it ########## be/src/exec/arrow/arrow_reader.cpp: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "exec/arrow/arrow_reader.h" + +#include <arrow/array.h> +#include <arrow/status.h> +#include <time.h> + +#include "common/logging.h" +#include "exec/file_reader.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "util/thrift_util.h" + +namespace doris { + +// Broker + +ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) { + _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader)); + _rb_reader = nullptr; + _total_groups = 0; + _current_group = 0; +} + +ArrowReaderWrap::~ArrowReaderWrap() { + close(); +} + +void ArrowReaderWrap::close() { + arrow::Status st = _arrow_file->Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) { + _include_column_ids.clear(); + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = tuple_slot_descs.at(i); + // Get the Column Reader for the boolean column + auto iter = _map_column.find(slot_desc->col_name()); + if (iter != _map_column.end()) { + _include_column_ids.emplace_back(iter->second); + } else { + std::stringstream str_error; + str_error << "Invalid Column Name:" << slot_desc->col_name(); + LOG(WARNING) << str_error.str(); + return Status::InvalidArgument(str_error.str()); + } + } + return Status::OK(); +} + +ArrowFile::ArrowFile(FileReader* file) : _file(file) {} + +ArrowFile::~ArrowFile() { + arrow::Status st = Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +arrow::Status ArrowFile::Close() { + if (_file != nullptr) { + _file->close(); + delete _file; + _file = nullptr; + } + return arrow::Status::OK(); +} + +bool ArrowFile::closed() const { + if (_file != nullptr) { + return _file->closed(); + } else { + return true; + } +} + +arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) { + return ReadAt(_pos, nbytes, buffer); +} + +arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) { + int64_t reads = 0; + int64_t bytes_read = 0; + _pos = position; + while (nbytes > 0) { + Status result = _file->readat(_pos, nbytes, &reads, out); + if (!result.ok()) { + bytes_read = 0; Review Comment: unless code ########## be/src/exec/arrow/parquet_reader.h: ########## @@ -55,44 +56,27 @@ class SlotDescriptor; class MemPool; class FileReader; -class ParquetFile : public arrow::io::RandomAccessFile { -public: - ParquetFile(FileReader* file); - ~ParquetFile() override; - arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override; - arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; - arrow::Result<int64_t> GetSize() override; - arrow::Status Seek(int64_t position) override; - arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; - arrow::Result<int64_t> Tell() const override; - arrow::Status Close() override; - bool closed() const override; - -private: - FileReader* _file; - int64_t _pos = 0; -}; - // Reader of broker parquet file -class ParquetReaderWrap { +class ParquetReaderWrap : public ArrowReaderWrap { public: - ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file); - virtual ~ParquetReaderWrap(); + // batch_size is not use here + ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file); + virtual ~ParquetReaderWrap() {} // Read Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, - MemPool* mem_pool, bool* eof); - void close(); - Status size(int64_t* size); - Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::string& timezone); + MemPool* mem_pool, bool* eof) override; + Status size(int64_t* size) override; + Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::string& timezone) override; Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, - const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof); + const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof) override; + void close() override; private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); - Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs); Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof); Review Comment: unless for the param of `tuple_slot_descs`, delete it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
