kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246022396


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, 
then
-          /// it may be that the stream has a schema but no actual data. In 
such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide 
what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the 
expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number 
(", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 
0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());

Review Comment:
   Good catch!



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return 
metadatas_; }

Review Comment:
   Ah, I just used the same signature as `record_batches()` but we should use 
const-ref.
   I can't remember why I didn't use const-ref for `record_batches()` but I 
just doesn't notice it...
   
   I also use const-ref for `record_batches()` too. It breaks a backward 
compatibility but I hope that nobody doesn't use this.



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, 
then
-          /// it may be that the stream has a schema but no actual data. In 
such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide 
what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the 
expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number 
(", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 
0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());
+    while (collect_listener->num_record_batches() == 0) {

Review Comment:
   Ah, it's better.



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return 
metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }

Review Comment:
   Ah, yes. Other places use `int64_t` or `int` for the number of record 
batches.



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnEOS();
 
-  /// \brief Called when a record batch is decoded.
+  /// \brief Called when a record batch is decoded and
+  /// OnReocrdBatchDecoded() isn't overrided.

Review Comment:
   Oh...



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, 
then
-          /// it may be that the stream has a schema but no actual data. In 
such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide 
what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the 
expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   
https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR940
 and 
https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR954
 .
   (I understand that this PR's diff is difficult to review...)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to