This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a00fecf7d GH-36674: [C++] Use anonymous namespace in
arrow/ipc/reader.cc (#36937)
1a00fecf7d is described below
commit 1a00fecf7dc1d75bfb5d31b6c5fae4b3d646bf1b
Author: pegasas <[email protected]>
AuthorDate: Wed Aug 9 22:44:34 2023 +0800
GH-36674: [C++] Use anonymous namespace in arrow/ipc/reader.cc (#36937)
### Rationale for this change
The following types and functions should be in file scope:
IpcReadConext, BatchDataReadRequest, ArrayLoader,
DecompressBuffer(s), LoadRecordBatch*, GetCompression*,
ReadRecordBatchInternal,
GetInclusionMaskAndOutSchmea, UnpackSchemaMessage,
ReadDictionary, AsyncRecordBatchStreamReaderImpl.
### What changes are included in this PR?
Use anonymous namespace around the aforementioned definitions.
### Are these changes tested?
No
### Are there any user-facing changes?
No
Lead-authored-by: pegasas <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/ipc/reader.cc | 81 ++++++++++++++++++++++-----------------------
1 file changed, 40 insertions(+), 41 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index eadba69f05..125a8f2d41 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -109,8 +109,6 @@ Status InvalidMessageType(MessageType expected, MessageType
actual) {
} \
} while (0)
-} // namespace
-
// ----------------------------------------------------------------------
// Record batch read path
@@ -634,8 +632,7 @@ Status GetCompressionExperimental(const flatbuf::Message*
message,
return Status::OK();
}
-static Status ReadContiguousPayload(io::InputStream* file,
- std::unique_ptr<Message>* message) {
+Status ReadContiguousPayload(io::InputStream* file, std::unique_ptr<Message>*
message) {
ARROW_ASSIGN_OR_RAISE(*message, ReadMessage(file));
if (*message == nullptr) {
return Status::Invalid("Unable to read metadata at offset");
@@ -643,27 +640,6 @@ static Status ReadContiguousPayload(io::InputStream* file,
return Status::OK();
}
-Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
- const std::shared_ptr<Schema>& schema, const DictionaryMemo*
dictionary_memo,
- const IpcReadOptions& options, io::InputStream* file) {
- std::unique_ptr<Message> message;
- RETURN_NOT_OK(ReadContiguousPayload(file, &message));
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
- return ReadRecordBatch(*message->metadata(), schema, dictionary_memo,
options,
- reader.get());
-}
-
-Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
- const Message& message, const std::shared_ptr<Schema>& schema,
- const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
- CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
- CHECK_HAS_BODY(message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
- return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
- reader.get());
-}
-
Result<RecordBatchWithMetadata> ReadRecordBatchInternal(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const std::vector<bool>& inclusion_mask, IpcReadContext& context,
@@ -764,22 +740,6 @@ Status UnpackSchemaMessage(const Message& message, const
IpcReadOptions& options
out_schema, field_inclusion_mask, swap_endian);
}
-Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
- const Buffer& metadata, const std::shared_ptr<Schema>& schema,
- const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
- io::RandomAccessFile* file) {
- std::shared_ptr<Schema> out_schema;
- // Empty means do not use
- std::vector<bool> inclusion_mask;
- IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo),
options, false);
- RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema,
context.options.included_fields,
- &inclusion_mask, &out_schema));
- ARROW_ASSIGN_OR_RAISE(
- auto batch_and_custom_metadata,
- ReadRecordBatchInternal(metadata, schema, inclusion_mask, context,
file));
- return batch_and_custom_metadata.batch;
-}
-
Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
DictionaryKind* kind, io::RandomAccessFile* file) {
const flatbuf::Message* message = nullptr;
@@ -851,6 +811,45 @@ Status ReadDictionary(const Message& message, const
IpcReadContext& context,
return ReadDictionary(*message.metadata(), context, kind, reader.get());
}
+} // namespace
+
+Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
+ const Buffer& metadata, const std::shared_ptr<Schema>& schema,
+ const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
+ io::RandomAccessFile* file) {
+ std::shared_ptr<Schema> out_schema;
+ // Empty means do not use
+ std::vector<bool> inclusion_mask;
+ IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo),
options, false);
+ RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema,
context.options.included_fields,
+ &inclusion_mask, &out_schema));
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch_and_custom_metadata,
+ ReadRecordBatchInternal(metadata, schema, inclusion_mask, context,
file));
+ return batch_and_custom_metadata.batch;
+}
+
+Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
+ const std::shared_ptr<Schema>& schema, const DictionaryMemo*
dictionary_memo,
+ const IpcReadOptions& options, io::InputStream* file) {
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(ReadContiguousPayload(file, &message));
+ CHECK_HAS_BODY(*message);
+ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+ return ReadRecordBatch(*message->metadata(), schema, dictionary_memo,
options,
+ reader.get());
+}
+
+Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
+ const Message& message, const std::shared_ptr<Schema>& schema,
+ const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
+ CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
+ CHECK_HAS_BODY(message);
+ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
+ return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
+ reader.get());
+}
+
// Streaming format decoder
class StreamDecoderInternal : public MessageDecoderListener {
public: