westonpace commented on a change in pull request #10662:
URL: https://github.com/apache/arrow/pull/10662#discussion_r707619719
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
Review comment:
This should return `Status`. OOM is a real consideration and it should
be, ideally, handled gracefully.
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
+ if (ARROW_PREDICT_FALSE(size_ == capacity_)) {
+ capacity_ *= 2;
+ ARROW_CHECK_OK(buffer_->Resize(capacity_ * sizeof(Type)));
Review comment:
`RETURN_NOT_OK`
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -391,17 +391,20 @@ namespace {
class BlockParsingOperator {
public:
BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
- int num_csv_cols, int64_t first_row)
+ int num_csv_cols, int64_t first_row, int64_t offset)
: io_context_(io_context),
parse_options_(parse_options),
num_csv_cols_(num_csv_cols),
count_rows_(first_row >= 0),
- num_rows_seen_(first_row) {}
+ num_rows_seen_(first_row),
+ next_block_offset_(offset) {}
Result<ParsedBlock> operator()(const CSVBlock& block) {
constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
- auto parser = std::make_shared<BlockParser>(
- io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_,
max_num_rows);
+ next_block_offset_ += block.bytes_skipped;
Review comment:
At the moment streaming parsing is always serial but that is due to a
bug/limitation (ARROW-13155). Eventually, `operator()` will be called multiple
times at once (across multiple blocks). I think this breaks both
`num_rows_seen_` and `next_block_offset_` but we can fix it then.
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
Review comment:
Ah, I see now this is just how it was before. I don't think you need to
fix it in this PR then. @pitrou do you know if this was done for performance
reasons?
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
Review comment:
I'm not sure I like the name `TypedXyz` makes me think of "types" which,
in Arrow, usually means `DataType`. So I would expect something called a
`TypedBuffer` to be similar to an array. Also, `Buffer` I would think of as
something that has a fixed size. Maybe `ResettableBufferBuilder` or
`ResettableBuilder`? Then the name is pretty similar to the builders we have
but the functionality is pretty similar too so I think that fits.
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -742,6 +751,25 @@ class ReaderMixin {
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
}
+ /**
+ * @brief add the `size()` of all buffers in `block` to `parser_offset_`
+ * @return the parser offset which should be used by the caller
+ */
+ int64_t UpdateParserOffset(const CSVBlock& block) {
+ parser_offset_ += block.bytes_skipped;
+ auto result = parser_offset_;
Review comment:
It's been a hard habit for me to break but `result` is not a good
variable name. Maybe `updated_offset` or something like that?
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
+ if (ARROW_PREDICT_FALSE(size_ == capacity_)) {
+ capacity_ *= 2;
+ ARROW_CHECK_OK(buffer_->Resize(capacity_ * sizeof(Type)));
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+ typed_buffer_[size_++] = value;
+ }
+
+ Status Shrink() {
+ ARROW_CHECK_OK(buffer_->Resize(size_ * sizeof(Type)));
+ return arrow::Status::OK();
Review comment:
I'm quite certain that `PoolBuffer` will reallocate if a resize shrinks
the capacity enough to fit into a smaller power of 2. As such, you should
probably be reassigning `typed_buffer_` here. Although, in usage, it appears
you only call `Shrink` when you're finished so maybe just name it `Finish` and
have it return the `buffer` and get rid of `buffer()`?
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
+ if (ARROW_PREDICT_FALSE(size_ == capacity_)) {
+ capacity_ *= 2;
+ ARROW_CHECK_OK(buffer_->Resize(capacity_ * sizeof(Type)));
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+ typed_buffer_[size_++] = value;
+ }
+
+ Status Shrink() {
+ ARROW_CHECK_OK(buffer_->Resize(size_ * sizeof(Type)));
Review comment:
`RETURN_NOT_OK`. I could maybe be convinced this would never fail since
you should be going to a smaller size but since you're returning `Status`
anyways it's probably better to be consistent and who knows how allocators
choose to work.
##########
File path: cpp/src/arrow/csv/parser.cc
##########
@@ -100,6 +100,47 @@ class PresizedDataWriter {
int64_t saved_parsed_size_;
};
+/// \brief Utility class for writing one type into a buffer
+/// \tparam Type of the value stored in this buffer
+template <typename Type>
+class TypedBuffer {
+ public:
+ explicit TypedBuffer(MemoryPool* pool, int64_t capacity)
+ : size_(0), capacity_(capacity), mark_(-1) {
+ buffer_ = *AllocateResizableBuffer(capacity_ * sizeof(Type), pool);
+ typed_buffer_ = reinterpret_cast<Type*>(buffer_->mutable_data());
+ }
+
+ void Append(const Type& value) {
Review comment:
This should be `void Append(Type value)` to be consistent with other
usage (ParsedValueDesc is passed by value)
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -742,6 +751,25 @@ class ReaderMixin {
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
}
+ /**
+ * @brief add the `size()` of all buffers in `block` to `parser_offset_`
+ * @return the parser offset which should be used by the caller
Review comment:
Use `\brief` and `\return` and `///` comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]