kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246022396
##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public
RecordBatchStreamReader {
return Status::OK();
}
- Status ReadInitialDictionaries() {
- // We must receive all dictionaries before reconstructing the
- // first record batch. Subsequent dictionary deltas modify the memo
- std::unique_ptr<Message> message;
-
- // TODO(wesm): In future, we may want to reconcile the ids in the stream
with
- // those found in the schema
- const auto num_dicts = dictionary_memo_.fields().num_dicts();
- for (int i = 0; i < num_dicts; ++i) {
- ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
- if (!message) {
- if (i == 0) {
- /// ARROW-6006: If we fail to find any dictionaries in the stream,
then
- /// it may be that the stream has a schema but no actual data. In
such
- /// case we communicate that we were unable to find the dictionaries
- /// (but there was no failure otherwise), so the caller can decide
what
- /// to do
- empty_stream_ = true;
- break;
- } else {
- // ARROW-6126, the stream terminated before receiving the expected
- // number of dictionaries
- return Status::Invalid("IPC stream ended without reading the
expected number (",
- num_dicts, ") of dictionaries");
- }
- }
+ std::shared_ptr<Listener> listener_;
+ const IpcReadOptions options_;
+ State state_;
+ std::vector<bool> field_inclusion_mask_;
+ int num_required_initial_dictionaries_;
+ int num_read_initial_dictionaries_;
+ DictionaryMemo dictionary_memo_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<Schema> out_schema_;
+ ReadStats stats_;
+ bool swap_endian_;
+};
- if (message->type() != MessageType::DICTIONARY_BATCH) {
- return Status::Invalid("IPC stream did not have the expected number
(", num_dicts,
- ") of dictionaries at the start of the stream");
- }
- RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+ public StreamDecoderInternal {
+ public:
+ RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+ const IpcReadOptions& options)
+ : RecordBatchStreamReader(),
+ StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+ message_reader_(std::move(message_reader)) {}
+
+ Status Init() {
+ // Read schema
+ ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+ if (!message) {
+ return Status::Invalid("Tried reading schema message, was null or length
0");
}
+ return OnMessageDecoded(std::move(message));
+ }
- have_read_initial_dictionaries_ = true;
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+ *batch = std::move(batch_with_metadata.batch);
return Status::OK();
}
- std::unique_ptr<MessageReader> message_reader_;
- IpcReadOptions options_;
- std::vector<bool> field_inclusion_mask_;
-
- bool have_read_initial_dictionaries_ = false;
-
- // Flag to set in case where we fail to observe all dictionaries in a stream,
- // and so the reader should not attempt to parse any messages
- bool empty_stream_ = false;
+ Result<RecordBatchWithMetadata> ReadNext() override {
+ auto collect_listener = static_cast<CollectListener*>(raw_listener());
Review Comment:
Good catch!
##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
return record_batches_;
}
+ /// \return the all decoded metadatas
+ std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return
metadatas_; }
Review Comment:
Ah, I just used the same signature as `record_batches()` but we should use
const-ref.
I can't remember why I didn't use const-ref for `record_batches()` but I
just doesn't notice it...
I also use const-ref for `record_batches()` too. It breaks a backward
compatibility but I hope that nobody doesn't use this.
##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public
RecordBatchStreamReader {
return Status::OK();
}
- Status ReadInitialDictionaries() {
- // We must receive all dictionaries before reconstructing the
- // first record batch. Subsequent dictionary deltas modify the memo
- std::unique_ptr<Message> message;
-
- // TODO(wesm): In future, we may want to reconcile the ids in the stream
with
- // those found in the schema
- const auto num_dicts = dictionary_memo_.fields().num_dicts();
- for (int i = 0; i < num_dicts; ++i) {
- ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
- if (!message) {
- if (i == 0) {
- /// ARROW-6006: If we fail to find any dictionaries in the stream,
then
- /// it may be that the stream has a schema but no actual data. In
such
- /// case we communicate that we were unable to find the dictionaries
- /// (but there was no failure otherwise), so the caller can decide
what
- /// to do
- empty_stream_ = true;
- break;
- } else {
- // ARROW-6126, the stream terminated before receiving the expected
- // number of dictionaries
- return Status::Invalid("IPC stream ended without reading the
expected number (",
- num_dicts, ") of dictionaries");
- }
- }
+ std::shared_ptr<Listener> listener_;
+ const IpcReadOptions options_;
+ State state_;
+ std::vector<bool> field_inclusion_mask_;
+ int num_required_initial_dictionaries_;
+ int num_read_initial_dictionaries_;
+ DictionaryMemo dictionary_memo_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<Schema> out_schema_;
+ ReadStats stats_;
+ bool swap_endian_;
+};
- if (message->type() != MessageType::DICTIONARY_BATCH) {
- return Status::Invalid("IPC stream did not have the expected number
(", num_dicts,
- ") of dictionaries at the start of the stream");
- }
- RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+ public StreamDecoderInternal {
+ public:
+ RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+ const IpcReadOptions& options)
+ : RecordBatchStreamReader(),
+ StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+ message_reader_(std::move(message_reader)) {}
+
+ Status Init() {
+ // Read schema
+ ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+ if (!message) {
+ return Status::Invalid("Tried reading schema message, was null or length
0");
}
+ return OnMessageDecoded(std::move(message));
+ }
- have_read_initial_dictionaries_ = true;
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+ *batch = std::move(batch_with_metadata.batch);
return Status::OK();
}
- std::unique_ptr<MessageReader> message_reader_;
- IpcReadOptions options_;
- std::vector<bool> field_inclusion_mask_;
-
- bool have_read_initial_dictionaries_ = false;
-
- // Flag to set in case where we fail to observe all dictionaries in a stream,
- // and so the reader should not attempt to parse any messages
- bool empty_stream_ = false;
+ Result<RecordBatchWithMetadata> ReadNext() override {
+ auto collect_listener = static_cast<CollectListener*>(raw_listener());
+ while (collect_listener->num_record_batches() == 0) {
Review Comment:
Ah, it's better.
##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
return record_batches_;
}
+ /// \return the all decoded metadatas
+ std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return
metadatas_; }
+
+ /// \return the number of collected record batches
+ size_t num_record_batches() const { return record_batches_.size(); }
Review Comment:
Ah, yes. Other places use `int64_t` or `int` for the number of record
batches.
##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
/// \see StreamDecoder
virtual Status OnEOS();
- /// \brief Called when a record batch is decoded.
+ /// \brief Called when a record batch is decoded and
+ /// OnReocrdBatchDecoded() isn't overrided.
Review Comment:
Oh...
##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public
RecordBatchStreamReader {
return Status::OK();
}
- Status ReadInitialDictionaries() {
- // We must receive all dictionaries before reconstructing the
- // first record batch. Subsequent dictionary deltas modify the memo
- std::unique_ptr<Message> message;
-
- // TODO(wesm): In future, we may want to reconcile the ids in the stream
with
- // those found in the schema
- const auto num_dicts = dictionary_memo_.fields().num_dicts();
- for (int i = 0; i < num_dicts; ++i) {
- ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
- if (!message) {
- if (i == 0) {
- /// ARROW-6006: If we fail to find any dictionaries in the stream,
then
- /// it may be that the stream has a schema but no actual data. In
such
- /// case we communicate that we were unable to find the dictionaries
- /// (but there was no failure otherwise), so the caller can decide
what
- /// to do
- empty_stream_ = true;
- break;
- } else {
- // ARROW-6126, the stream terminated before receiving the expected
- // number of dictionaries
- return Status::Invalid("IPC stream ended without reading the
expected number (",
- num_dicts, ") of dictionaries");
- }
- }
+ std::shared_ptr<Listener> listener_;
+ const IpcReadOptions options_;
+ State state_;
+ std::vector<bool> field_inclusion_mask_;
+ int num_required_initial_dictionaries_;
+ int num_read_initial_dictionaries_;
+ DictionaryMemo dictionary_memo_;
+ std::shared_ptr<Schema> schema_;
Review Comment:
https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR940
and
https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR954
.
(I understand that this PR's diff is difficult to review...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]