Repository: arrow Updated Branches: refs/heads/master ef3b6b344 -> 16c97592b
ARROW-577: [C++] Use private implementation pattern in ipc::StreamWriter and ipc::FileWriter This patch also includes some code reorganization -- I moved the reader and writer classes to their own headers/compilation units. I also moved the stream-to-file and file-to-stream executables to arrow/ipc Author: Wes McKinney <[email protected]> Closes #351 from wesm/ARROW-577 and squashes the following commits: 98c32d2 [Wes McKinney] Only build file/stream utils if ARROW_BUILD_UTILITIES is on c5fa43f [Wes McKinney] Refactor to give make stream and file writer implementation details private in public ABI Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/16c97592 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/16c97592 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/16c97592 Branch: refs/heads/master Commit: 16c97592bf948c32a8dae9441ace078422d642dd Parents: ef3b6b3 Author: Wes McKinney <[email protected]> Authored: Sun Feb 26 19:22:15 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Sun Feb 26 19:22:15 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/ipc/CMakeLists.txt | 32 +- cpp/src/arrow/ipc/api.h | 27 ++ cpp/src/arrow/ipc/file-to-stream.cc | 60 ++++ cpp/src/arrow/ipc/file.cc | 297 ------------------- cpp/src/arrow/ipc/file.h | 111 ------- cpp/src/arrow/ipc/ipc-file-test.cc | 4 +- cpp/src/arrow/ipc/json-integration-test.cc | 3 +- cpp/src/arrow/ipc/metadata.h | 12 + cpp/src/arrow/ipc/reader.cc | 369 ++++++++++++++++++++++++ cpp/src/arrow/ipc/reader.h | 112 +++++++ cpp/src/arrow/ipc/stream-to-file.cc | 58 ++++ cpp/src/arrow/ipc/stream.cc | 310 -------------------- cpp/src/arrow/ipc/stream.h | 137 --------- cpp/src/arrow/ipc/writer.cc | 287 ++++++++++++++++++ cpp/src/arrow/ipc/writer.h | 92 ++++++ cpp/src/arrow/util/CMakeLists.txt | 22 -- cpp/src/arrow/util/file-to-stream.cc | 60 ---- cpp/src/arrow/util/stream-to-file.cc | 58 ---- python/pyarrow/includes/libarrow_ipc.pxd | 5 +- 19 files changed, 1050 insertions(+), 1006 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index e7a3fdb..08da0a1 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -30,12 +30,12 @@ set(ARROW_IPC_TEST_LINK_LIBS set(ARROW_IPC_SRCS adapter.cc - file.cc json.cc json-internal.cc metadata.cc metadata-internal.cc - stream.cc + reader.cc + writer.cc ) if(NOT APPLE) @@ -138,10 +138,11 @@ add_dependencies(arrow_ipc_objlib metadata_fbs) # Headers: top level install(FILES adapter.h - file.h + api.h json.h metadata.h - stream.h + reader.h + writer.h DESTINATION include/arrow/ipc) # pkg-config support @@ -151,3 +152,26 @@ configure_file(arrow-ipc.pc.in install( FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-ipc.pc" DESTINATION "lib/pkgconfig/") + + +set(UTIL_LINK_LIBS + arrow_ipc_static + arrow_io_static + arrow_static + boost_filesystem_static + boost_system_static + dl) + +if (NOT APPLE) + set(UTIL_LINK_LIBS + ${UTIL_LINK_LIBS} + boost_filesystem_static + boost_system_static) +endif() + +if (ARROW_BUILD_UTILITIES) + add_executable(file-to-stream file-to-stream.cc) + target_link_libraries(file-to-stream ${UTIL_LINK_LIBS}) + add_executable(stream-to-file stream-to-file.cc) + target_link_libraries(stream-to-file ${UTIL_LINK_LIBS}) +endif() http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/api.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/api.h b/cpp/src/arrow/ipc/api.h new file mode 100644 index 0000000..cb85421 --- /dev/null +++ b/cpp/src/arrow/ipc/api.h @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IPC_API_H +#define ARROW_IPC_API_H + +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/json.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" + +#endif // ARROW_IPC_API_H http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file-to-stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc new file mode 100644 index 0000000..8161b19 --- /dev/null +++ b/cpp/src/arrow/ipc/file-to-stream.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/file.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include <iostream> + +#include "arrow/util/io-util.h" + +namespace arrow { + +// Reads a file on the file system and prints to stdout the stream version of it. +Status ConvertToStream(const char* path) { + std::shared_ptr<io::ReadableFile> in_file; + std::shared_ptr<ipc::FileReader> reader; + + RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file)); + RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); + + io::StdoutStream sink; + std::shared_ptr<ipc::StreamWriter> writer; + RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer)); + for (int i = 0; i < reader->num_record_batches(); ++i) { + std::shared_ptr<RecordBatch> chunk; + RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); + RETURN_NOT_OK(writer->WriteRecordBatch(*chunk)); + } + return writer->Close(); +} + +} // namespace arrow + +int main(int argc, char** argv) { + if (argc != 2) { + std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl; + return 1; + } + arrow::Status status = arrow::ConvertToStream(argv[1]); + if (!status.ok()) { + std::cerr << "Could not convert to stream: " << status.ToString() << std::endl; + return 1; + } + return 0; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc deleted file mode 100644 index c1d483f..0000000 --- a/cpp/src/arrow/ipc/file.cc +++ /dev/null @@ -1,297 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/file.h" - -#include <cstdint> -#include <cstring> -#include <sstream> -#include <vector> - -#include "arrow/buffer.h" -#include "arrow/io/interfaces.h" -#include "arrow/io/memory.h" -#include "arrow/ipc/adapter.h" -#include "arrow/ipc/metadata-internal.h" -#include "arrow/ipc/metadata.h" -#include "arrow/ipc/util.h" -#include "arrow/status.h" -#include "arrow/util/logging.h" - -namespace arrow { -namespace ipc { - -static constexpr const char* kArrowMagicBytes = "ARROW1"; - -static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> -FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { - std::vector<flatbuf::Block> fb_blocks; - - for (const FileBlock& block : blocks) { - fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); - } - - return fbb.CreateVectorOfStructs(fb_blocks); -} - -Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out) { - FBB fbb; - - flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); - - auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); - auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - - auto footer = flatbuf::CreateFooter( - fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); - - fbb.Finish(footer); - - int32_t size = fbb.GetSize(); - - return out->Write(fbb.GetBufferPointer(), size); -} - -static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { - return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); -} - -// ---------------------------------------------------------------------- -// File writer implementation - -FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) - : StreamWriter(sink, schema) {} - -Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out) { - *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private - RETURN_NOT_OK((*out)->UpdatePosition()); - return Status::OK(); -} - -Status FileWriter::Start() { - RETURN_NOT_OK(WriteAligned( - reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); - - // We write the schema at the start of the file (and the end). This also - // writes all the dictionaries at the beginning of the file - return StreamWriter::Start(); -} - -Status FileWriter::Close() { - // Write metadata - int64_t initial_position = position_; - RETURN_NOT_OK(WriteFileFooter( - *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), sink_)); - RETURN_NOT_OK(UpdatePosition()); - - // Write footer length - int32_t footer_length = position_ - initial_position; - - if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); } - - RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(&footer_length), sizeof(int32_t))); - - // Write magic bytes to end file - return Write( - reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)); -} - -// ---------------------------------------------------------------------- -// Reader implementation - -class FileReader::FileReaderImpl { - public: - FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); } - - Status ReadFooter() { - int magic_size = static_cast<int>(strlen(kArrowMagicBytes)); - - if (footer_offset_ <= magic_size * 2 + 4) { - std::stringstream ss; - ss << "File is too small: " << footer_offset_; - return Status::Invalid(ss.str()); - } - - std::shared_ptr<Buffer> buffer; - int file_end_size = magic_size + sizeof(int32_t); - RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer)); - - if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { - return Status::Invalid("Not an Arrow file"); - } - - int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data()); - - if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) { - return Status::Invalid("File is smaller than indicated metadata size"); - } - - // Now read the footer - RETURN_NOT_OK(file_->ReadAt( - footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_)); - - // TODO(wesm): Verify the footer - footer_ = flatbuf::GetFooter(footer_buffer_->data()); - schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema())); - - return Status::OK(); - } - - int num_dictionaries() const { return footer_->dictionaries()->size(); } - - int num_record_batches() const { return footer_->recordBatches()->size(); } - - MetadataVersion::type version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - return MetadataVersion::V2; - // Add cases as other versions become available - default: - return MetadataVersion::V2; - } - } - - FileBlock record_batch(int i) const { - return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - } - - FileBlock dictionary(int i) const { - return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - } - - const SchemaMetadata& schema_metadata() const { return *schema_metadata_; } - - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { - DCHECK_GE(i, 0); - DCHECK_LT(i, num_record_batches()); - FileBlock block = record_batch(i); - - std::shared_ptr<Message> message; - RETURN_NOT_OK( - ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - auto metadata = std::make_shared<RecordBatchMetadata>(message); - - // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see - // ARROW-384). - std::shared_ptr<Buffer> buffer_block; - RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); - io::BufferReader reader(buffer_block); - - return ReadRecordBatch(*metadata, schema_, &reader, batch); - } - - Status ReadSchema() { - RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_)); - - // Read all the dictionaries - for (int i = 0; i < num_dictionaries(); ++i) { - FileBlock block = dictionary(i); - std::shared_ptr<Message> message; - RETURN_NOT_OK( - ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - - // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a more - // invasive refactor - DictionaryBatchMetadata metadata(message); - - // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see - // ARROW-384). - std::shared_ptr<Buffer> buffer_block; - RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); - io::BufferReader reader(buffer_block); - - std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, &dictionary)); - RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), dictionary)); - } - - // Get the schema - return schema_metadata_->GetSchema(*dictionary_memo_, &schema_); - } - - Status Open( - const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset) { - file_ = file; - footer_offset_ = footer_offset; - RETURN_NOT_OK(ReadFooter()); - return ReadSchema(); - } - - std::shared_ptr<Schema> schema() const { return schema_; } - - private: - std::shared_ptr<io::ReadableFileInterface> file_; - - // The location where the Arrow file layout ends. May be the end of the file - // or some other location if embedded in a larger file. - int64_t footer_offset_; - - // Footer metadata - std::shared_ptr<Buffer> footer_buffer_; - const flatbuf::Footer* footer_; - std::unique_ptr<SchemaMetadata> schema_metadata_; - - DictionaryTypeMap dictionary_fields_; - std::shared_ptr<DictionaryMemo> dictionary_memo_; - - // Reconstructed schema, including any read dictionaries - std::shared_ptr<Schema> schema_; -}; - -FileReader::FileReader() { - impl_.reset(new FileReaderImpl()); -} - -FileReader::~FileReader() {} - -Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, - std::shared_ptr<FileReader>* reader) { - int64_t footer_offset; - RETURN_NOT_OK(file->GetSize(&footer_offset)); - return Open(file, footer_offset, reader); -} - -Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, - int64_t footer_offset, std::shared_ptr<FileReader>* reader) { - *reader = std::shared_ptr<FileReader>(new FileReader()); - return (*reader)->impl_->Open(file, footer_offset); -} - -std::shared_ptr<Schema> FileReader::schema() const { - return impl_->schema(); -} - -int FileReader::num_record_batches() const { - return impl_->num_record_batches(); -} - -MetadataVersion::type FileReader::version() const { - return impl_->version(); -} - -Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { - return impl_->GetRecordBatch(i, batch); -} - -} // namespace ipc -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h deleted file mode 100644 index 524766c..0000000 --- a/cpp/src/arrow/ipc/file.h +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Implement Arrow file layout for IPC/RPC purposes and short-lived storage - -#ifndef ARROW_IPC_FILE_H -#define ARROW_IPC_FILE_H - -#include <cstdint> -#include <memory> -#include <vector> - -#include "arrow/ipc/metadata.h" -#include "arrow/ipc/stream.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Buffer; -class RecordBatch; -class Schema; -class Status; - -namespace io { - -class OutputStream; -class ReadableFileInterface; - -} // namespace io - -namespace ipc { - -Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out); - -class ARROW_EXPORT FileWriter : public StreamWriter { - public: - static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out); - - using StreamWriter::WriteRecordBatch; - Status Close() override; - - private: - FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); - - Status Start() override; -}; - -class ARROW_EXPORT FileReader { - public: - ~FileReader(); - - // Open a file-like object that is assumed to be self-contained; i.e., the - // end of the file interface is the end of the Arrow file. Note that there - // can be any amount of data preceding the Arrow-formatted data, because we - // need only locate the end of the Arrow file stream to discover the metadata - // and then proceed to read the data into memory. - static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, - std::shared_ptr<FileReader>* reader); - - // If the file is embedded within some larger file or memory region, you can - // pass the absolute memory offset to the end of the file (which contains the - // metadata footer). The metadata must have been written with memory offsets - // relative to the start of the containing file - // - // @param file: the data source - // @param footer_offset: the position of the end of the Arrow "file" - static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, - int64_t footer_offset, std::shared_ptr<FileReader>* reader); - - /// The schema includes any dictionaries - std::shared_ptr<Schema> schema() const; - - int num_record_batches() const; - - MetadataVersion::type version() const; - - // Read a record batch from the file. Does not copy memory if the input - // source supports zero-copy. - // - // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an - // "always copy" option) - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); - - private: - FileReader(); - - class ARROW_NO_EXPORT FileReaderImpl; - std::unique_ptr<FileReaderImpl> impl_; -}; - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_FILE_H http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index 4b82aab..e58f2cf 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -28,10 +28,10 @@ #include "arrow/io/memory.h" #include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" -#include "arrow/ipc/file.h" -#include "arrow/ipc/stream.h" +#include "arrow/ipc/reader.h" #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" +#include "arrow/ipc/writer.h" #include "arrow/buffer.h" #include "arrow/memory_pool.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 95bc742..c16074e 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -29,8 +29,9 @@ #include <boost/filesystem.hpp> // NOLINT #include "arrow/io/file.h" -#include "arrow/ipc/file.h" #include "arrow/ipc/json.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" #include "arrow/pretty_print.h" #include "arrow/schema.h" #include "arrow/status.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 0091067..f12529b 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -50,6 +50,18 @@ struct MetadataVersion { enum type { V1, V2 }; }; +static constexpr const char* kArrowMagicBytes = "ARROW1"; + +struct ARROW_EXPORT FileBlock { + FileBlock() {} + FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) + : offset(offset), metadata_length(metadata_length), body_length(body_length) {} + + int64_t offset; + int32_t metadata_length; + int64_t body_length; +}; + //---------------------------------------------------------------------- using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>; http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc new file mode 100644 index 0000000..1a9af7d --- /dev/null +++ b/cpp/src/arrow/ipc/reader.cc @@ -0,0 +1,369 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/reader.h" + +#include <cstdint> +#include <cstring> +#include <sstream> +#include <string> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata-internal.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace ipc { + +// ---------------------------------------------------------------------- +// StreamReader implementation + +static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { + return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); +} + +static inline std::string message_type_name(Message::Type type) { + switch (type) { + case Message::SCHEMA: + return "schema"; + case Message::RECORD_BATCH: + return "record batch"; + case Message::DICTIONARY_BATCH: + return "dictionary"; + default: + break; + } + return "unknown"; +} + +class StreamReader::StreamReaderImpl { + public: + StreamReaderImpl() {} + ~StreamReaderImpl() {} + + Status Open(const std::shared_ptr<io::InputStream>& stream) { + stream_ = stream; + return ReadSchema(); + } + + Status ReadNextMessage(Message::Type expected_type, std::shared_ptr<Message>* message) { + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); + + if (buffer->size() != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); + + RETURN_NOT_OK(stream_->Read(message_length, &buffer)); + if (buffer->size() != message_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } + + RETURN_NOT_OK(Message::Open(buffer, 0, message)); + + if ((*message)->type() != expected_type) { + std::stringstream ss; + ss << "Message not expected type: " << message_type_name(expected_type) + << ", was: " << (*message)->type(); + return Status::IOError(ss.str()); + } + return Status::OK(); + } + + Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) { + RETURN_NOT_OK(stream_->Read(size, buffer)); + + if ((*buffer)->size() < size) { + return Status::IOError("Unexpected EOS when reading buffer"); + } + return Status::OK(); + } + + Status ReadNextDictionary() { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message)); + + DictionaryBatchMetadata metadata(message); + + std::shared_ptr<Buffer> batch_body; + RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)) + io::BufferReader reader(batch_body); + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, &dictionary)); + return dictionary_memo_.AddDictionary(metadata.id(), dictionary); + } + + Status ReadSchema() { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message)); + + SchemaMetadata schema_meta(message); + RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_)); + + // TODO(wesm): In future, we may want to reconcile the ids in the stream with + // those found in the schema + int num_dictionaries = static_cast<int>(dictionary_types_.size()); + for (int i = 0; i < num_dictionaries; ++i) { + RETURN_NOT_OK(ReadNextDictionary()); + } + + return schema_meta.GetSchema(dictionary_memo_, &schema_); + } + + Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message)); + + if (message == nullptr) { + // End of stream + *batch = nullptr; + return Status::OK(); + } + + RecordBatchMetadata batch_metadata(message); + + std::shared_ptr<Buffer> batch_body; + RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); + io::BufferReader reader(batch_body); + return ReadRecordBatch(batch_metadata, schema_, &reader, batch); + } + + std::shared_ptr<Schema> schema() const { return schema_; } + + private: + // dictionary_id -> type + DictionaryTypeMap dictionary_types_; + + DictionaryMemo dictionary_memo_; + + std::shared_ptr<io::InputStream> stream_; + std::shared_ptr<Schema> schema_; +}; + +StreamReader::StreamReader() { + impl_.reset(new StreamReaderImpl()); +} + +StreamReader::~StreamReader() {} + +Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<StreamReader>* reader) { + // Private ctor + *reader = std::shared_ptr<StreamReader>(new StreamReader()); + return (*reader)->impl_->Open(stream); +} + +std::shared_ptr<Schema> StreamReader::schema() const { + return impl_->schema(); +} + +Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + return impl_->GetNextRecordBatch(batch); +} + +// ---------------------------------------------------------------------- +// Reader implementation + +class FileReader::FileReaderImpl { + public: + FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); } + + Status ReadFooter() { + int magic_size = static_cast<int>(strlen(kArrowMagicBytes)); + + if (footer_offset_ <= magic_size * 2 + 4) { + std::stringstream ss; + ss << "File is too small: " << footer_offset_; + return Status::Invalid(ss.str()); + } + + std::shared_ptr<Buffer> buffer; + int file_end_size = magic_size + sizeof(int32_t); + RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer)); + + if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + return Status::Invalid("Not an Arrow file"); + } + + int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data()); + + if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) { + return Status::Invalid("File is smaller than indicated metadata size"); + } + + // Now read the footer + RETURN_NOT_OK(file_->ReadAt( + footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_)); + + // TODO(wesm): Verify the footer + footer_ = flatbuf::GetFooter(footer_buffer_->data()); + schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema())); + + return Status::OK(); + } + + int num_dictionaries() const { return footer_->dictionaries()->size(); } + + int num_record_batches() const { return footer_->recordBatches()->size(); } + + MetadataVersion::type version() const { + switch (footer_->version()) { + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; + // Add cases as other versions become available + default: + return MetadataVersion::V2; + } + } + + FileBlock record_batch(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + } + + FileBlock dictionary(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + } + + const SchemaMetadata& schema_metadata() const { return *schema_metadata_; } + + Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + FileBlock block = record_batch(i); + + std::shared_ptr<Message> message; + RETURN_NOT_OK( + ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + auto metadata = std::make_shared<RecordBatchMetadata>(message); + + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr<Buffer> buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); + + return ReadRecordBatch(*metadata, schema_, &reader, batch); + } + + Status ReadSchema() { + RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_)); + + // Read all the dictionaries + for (int i = 0; i < num_dictionaries(); ++i) { + FileBlock block = dictionary(i); + std::shared_ptr<Message> message; + RETURN_NOT_OK( + ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + + // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a more + // invasive refactor + DictionaryBatchMetadata metadata(message); + + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr<Buffer> buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, &dictionary)); + RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), dictionary)); + } + + // Get the schema + return schema_metadata_->GetSchema(*dictionary_memo_, &schema_); + } + + Status Open( + const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset) { + file_ = file; + footer_offset_ = footer_offset; + RETURN_NOT_OK(ReadFooter()); + return ReadSchema(); + } + + std::shared_ptr<Schema> schema() const { return schema_; } + + private: + std::shared_ptr<io::ReadableFileInterface> file_; + + // The location where the Arrow file layout ends. May be the end of the file + // or some other location if embedded in a larger file. + int64_t footer_offset_; + + // Footer metadata + std::shared_ptr<Buffer> footer_buffer_; + const flatbuf::Footer* footer_; + std::unique_ptr<SchemaMetadata> schema_metadata_; + + DictionaryTypeMap dictionary_fields_; + std::shared_ptr<DictionaryMemo> dictionary_memo_; + + // Reconstructed schema, including any read dictionaries + std::shared_ptr<Schema> schema_; +}; + +FileReader::FileReader() { + impl_.reset(new FileReaderImpl()); +} + +FileReader::~FileReader() {} + +Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, + std::shared_ptr<FileReader>* reader) { + int64_t footer_offset; + RETURN_NOT_OK(file->GetSize(&footer_offset)); + return Open(file, footer_offset, reader); +} + +Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, + int64_t footer_offset, std::shared_ptr<FileReader>* reader) { + *reader = std::shared_ptr<FileReader>(new FileReader()); + return (*reader)->impl_->Open(file, footer_offset); +} + +std::shared_ptr<Schema> FileReader::schema() const { + return impl_->schema(); +} + +int FileReader::num_record_batches() const { + return impl_->num_record_batches(); +} + +MetadataVersion::type FileReader::version() const { + return impl_->version(); +} + +Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { + return impl_->GetRecordBatch(i, batch); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h new file mode 100644 index 0000000..6f143e1 --- /dev/null +++ b/cpp/src/arrow/ipc/reader.h @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow file layout for IPC/RPC purposes and short-lived storage + +#ifndef ARROW_IPC_FILE_H +#define ARROW_IPC_FILE_H + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/ipc/metadata.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class RecordBatch; +class Schema; +class Status; + +namespace io { + +class InputStream; +class ReadableFileInterface; + +} // namespace io + +namespace ipc { + +class ARROW_EXPORT StreamReader { + public: + ~StreamReader(); + + // Open an stream. + static Status Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<StreamReader>* reader); + + std::shared_ptr<Schema> schema() const; + + // Returned batch is nullptr when end of stream reached + Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch); + + private: + StreamReader(); + + class ARROW_NO_EXPORT StreamReaderImpl; + std::unique_ptr<StreamReaderImpl> impl_; +}; + +class ARROW_EXPORT FileReader { + public: + ~FileReader(); + + // Open a file-like object that is assumed to be self-contained; i.e., the + // end of the file interface is the end of the Arrow file. Note that there + // can be any amount of data preceding the Arrow-formatted data, because we + // need only locate the end of the Arrow file stream to discover the metadata + // and then proceed to read the data into memory. + static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, + std::shared_ptr<FileReader>* reader); + + // If the file is embedded within some larger file or memory region, you can + // pass the absolute memory offset to the end of the file (which contains the + // metadata footer). The metadata must have been written with memory offsets + // relative to the start of the containing file + // + // @param file: the data source + // @param footer_offset: the position of the end of the Arrow "file" + static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, + int64_t footer_offset, std::shared_ptr<FileReader>* reader); + + /// The schema includes any dictionaries + std::shared_ptr<Schema> schema() const; + + int num_record_batches() const; + + MetadataVersion::type version() const; + + // Read a record batch from the file. Does not copy memory if the input + // source supports zero-copy. + // + // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an + // "always copy" option) + Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); + + private: + FileReader(); + + class ARROW_NO_EXPORT FileReaderImpl; + std::unique_ptr<FileReaderImpl> impl_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_FILE_H http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc new file mode 100644 index 0000000..ec0ac43 --- /dev/null +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/file.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include <iostream> + +#include "arrow/util/io-util.h" + +namespace arrow { + +// Converts a stream from stdin to a file written to standard out. +// A typical usage would be: +// $ <program that produces streaming output> | stream-to-file > file.arrow +Status ConvertToFile() { + std::shared_ptr<io::InputStream> input(new io::StdinStream); + std::shared_ptr<ipc::StreamReader> reader; + RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader)); + + io::StdoutStream sink; + std::shared_ptr<ipc::FileWriter> writer; + RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer)); + + std::shared_ptr<RecordBatch> batch; + while (true) { + RETURN_NOT_OK(reader->GetNextRecordBatch(&batch)); + if (batch == nullptr) break; + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } + return writer->Close(); +} + +} // namespace arrow + +int main(int argc, char** argv) { + arrow::Status status = arrow::ConvertToFile(); + if (!status.ok()) { + std::cerr << "Could not convert to file: " << status.ToString() << std::endl; + return 1; + } + return 0; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc deleted file mode 100644 index 7f5c993..0000000 --- a/cpp/src/arrow/ipc/stream.cc +++ /dev/null @@ -1,310 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/stream.h" - -#include <cstdint> -#include <cstring> -#include <sstream> -#include <string> -#include <vector> - -#include "arrow/buffer.h" -#include "arrow/io/interfaces.h" -#include "arrow/io/memory.h" -#include "arrow/ipc/adapter.h" -#include "arrow/ipc/metadata-internal.h" -#include "arrow/ipc/metadata.h" -#include "arrow/ipc/util.h" -#include "arrow/memory_pool.h" -#include "arrow/schema.h" -#include "arrow/status.h" -#include "arrow/table.h" -#include "arrow/util/logging.h" - -namespace arrow { -namespace ipc { - -// ---------------------------------------------------------------------- -// Stream writer implementation - -StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) - : sink_(sink), - schema_(schema), - dictionary_memo_(std::make_shared<DictionaryMemo>()), - pool_(default_memory_pool()), - position_(-1), - started_(false) {} - -Status StreamWriter::UpdatePosition() { - return sink_->Tell(&position_); -} - -Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(sink_->Write(data, nbytes)); - position_ += nbytes; - return Status::OK(); -} - -Status StreamWriter::Align() { - int64_t remainder = PaddedLength(position_) - position_; - if (remainder > 0) { return Write(kPaddingBytes, remainder); } - return Status::OK(); -} - -Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(Write(data, nbytes)); - return Align(); -} - -Status StreamWriter::CheckStarted() { - if (!started_) { return Start(); } - return Status::OK(); -} - -Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block) { - RETURN_NOT_OK(CheckStarted()); - - block->offset = position_; - - // Frame of reference in file format is 0, see ARROW-384 - const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_, - &block->metadata_length, &block->body_length, pool_)); - RETURN_NOT_OK(UpdatePosition()); - - DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; - - return Status::OK(); -} - -void StreamWriter::set_memory_pool(MemoryPool* pool) { - pool_ = pool; -} - -// ---------------------------------------------------------------------- -// StreamWriter implementation - -Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<StreamWriter>* out) { - // ctor is private - *out = std::shared_ptr<StreamWriter>(new StreamWriter(sink, schema)); - RETURN_NOT_OK((*out)->UpdatePosition()); - return Status::OK(); -} - -Status StreamWriter::Start() { - std::shared_ptr<Buffer> schema_fb; - RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), &schema_fb)); - - int32_t flatbuffer_size = schema_fb->size(); - RETURN_NOT_OK( - Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); - - // Write the flatbuffer - RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); - - // If there are any dictionaries, write them as the next messages - RETURN_NOT_OK(WriteDictionaries()); - - started_ = true; - return Status::OK(); -} - -Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { - // Push an empty FileBlock. Can be written in the footer later - record_batches_.emplace_back(0, 0, 0); - return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]); -} - -Status StreamWriter::WriteDictionaries() { - const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary(); - - dictionaries_.resize(id_to_dictionary.size()); - - // TODO(wesm): does sorting by id yield any benefit? - int dict_index = 0; - for (const auto& entry : id_to_dictionary) { - FileBlock* block = &dictionaries_[dict_index++]; - - block->offset = position_; - - // Frame of reference in file format is 0, see ARROW-384 - const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_, - &block->metadata_length, &block->body_length, pool_)); - RETURN_NOT_OK(UpdatePosition()); - DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes"; - } - - return Status::OK(); -} - -Status StreamWriter::Close() { - // Write the schema if not already written - // User is responsible for closing the OutputStream - return CheckStarted(); -} - -// ---------------------------------------------------------------------- -// StreamReader implementation - -static inline std::string message_type_name(Message::Type type) { - switch (type) { - case Message::SCHEMA: - return "schema"; - case Message::RECORD_BATCH: - return "record batch"; - case Message::DICTIONARY_BATCH: - return "dictionary"; - default: - break; - } - return "unknown"; -} - -class StreamReader::StreamReaderImpl { - public: - StreamReaderImpl() {} - ~StreamReaderImpl() {} - - Status Open(const std::shared_ptr<io::InputStream>& stream) { - stream_ = stream; - return ReadSchema(); - } - - Status ReadNextMessage(Message::Type expected_type, std::shared_ptr<Message>* message) { - std::shared_ptr<Buffer> buffer; - RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); - - if (buffer->size() != sizeof(int32_t)) { - *message = nullptr; - return Status::OK(); - } - - int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); - - RETURN_NOT_OK(stream_->Read(message_length, &buffer)); - if (buffer->size() != message_length) { - return Status::IOError("Unexpected end of stream trying to read message"); - } - - RETURN_NOT_OK(Message::Open(buffer, 0, message)); - - if ((*message)->type() != expected_type) { - std::stringstream ss; - ss << "Message not expected type: " << message_type_name(expected_type) - << ", was: " << (*message)->type(); - return Status::IOError(ss.str()); - } - return Status::OK(); - } - - Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) { - RETURN_NOT_OK(stream_->Read(size, buffer)); - - if ((*buffer)->size() < size) { - return Status::IOError("Unexpected EOS when reading buffer"); - } - return Status::OK(); - } - - Status ReadNextDictionary() { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message)); - - DictionaryBatchMetadata metadata(message); - - std::shared_ptr<Buffer> batch_body; - RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)) - io::BufferReader reader(batch_body); - - std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, &dictionary)); - return dictionary_memo_.AddDictionary(metadata.id(), dictionary); - } - - Status ReadSchema() { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message)); - - SchemaMetadata schema_meta(message); - RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_)); - - // TODO(wesm): In future, we may want to reconcile the ids in the stream with - // those found in the schema - int num_dictionaries = static_cast<int>(dictionary_types_.size()); - for (int i = 0; i < num_dictionaries; ++i) { - RETURN_NOT_OK(ReadNextDictionary()); - } - - return schema_meta.GetSchema(dictionary_memo_, &schema_); - } - - Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message)); - - if (message == nullptr) { - // End of stream - *batch = nullptr; - return Status::OK(); - } - - RecordBatchMetadata batch_metadata(message); - - std::shared_ptr<Buffer> batch_body; - RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); - io::BufferReader reader(batch_body); - return ReadRecordBatch(batch_metadata, schema_, &reader, batch); - } - - std::shared_ptr<Schema> schema() const { return schema_; } - - private: - // dictionary_id -> type - DictionaryTypeMap dictionary_types_; - - DictionaryMemo dictionary_memo_; - - std::shared_ptr<io::InputStream> stream_; - std::shared_ptr<Schema> schema_; -}; - -StreamReader::StreamReader() { - impl_.reset(new StreamReaderImpl()); -} - -StreamReader::~StreamReader() {} - -Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<StreamReader>* reader) { - // Private ctor - *reader = std::shared_ptr<StreamReader>(new StreamReader()); - return (*reader)->impl_->Open(stream); -} - -std::shared_ptr<Schema> StreamReader::schema() const { - return impl_->schema(); -} - -Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { - return impl_->GetNextRecordBatch(batch); -} - -} // namespace ipc -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h deleted file mode 100644 index 1c3f65e..0000000 --- a/cpp/src/arrow/ipc/stream.h +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Implement Arrow streaming binary format - -#ifndef ARROW_IPC_STREAM_H -#define ARROW_IPC_STREAM_H - -#include <cstdint> -#include <memory> -#include <vector> - -#include "arrow/ipc/metadata.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Array; -class Buffer; -struct Field; -class MemoryPool; -class RecordBatch; -class Schema; -class Status; - -namespace io { - -class InputStream; -class OutputStream; - -} // namespace io - -namespace ipc { - -struct ARROW_EXPORT FileBlock { - FileBlock() {} - FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) - : offset(offset), metadata_length(metadata_length), body_length(body_length) {} - - int64_t offset; - int32_t metadata_length; - int64_t body_length; -}; - -class ARROW_EXPORT StreamWriter { - public: - virtual ~StreamWriter() = default; - - static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<StreamWriter>* out); - - virtual Status WriteRecordBatch(const RecordBatch& batch); - - /// Perform any logic necessary to finish the stream. User is responsible for - /// closing the actual OutputStream - virtual Status Close(); - - // In some cases, writing may require memory allocation. We use the default - // memory pool, but provide the option to override - void set_memory_pool(MemoryPool* pool); - - protected: - StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); - - virtual Status Start(); - - Status CheckStarted(); - Status UpdatePosition(); - - Status WriteDictionaries(); - - Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block); - - // Adds padding bytes if necessary to ensure all memory blocks are written on - // 8-byte boundaries. - Status Align(); - - // Write data and update position - Status Write(const uint8_t* data, int64_t nbytes); - - // Write and align - Status WriteAligned(const uint8_t* data, int64_t nbytes); - - io::OutputStream* sink_; - std::shared_ptr<Schema> schema_; - - // When writing out the schema, we keep track of all the dictionaries we - // encounter, as they must be written out first in the stream - std::shared_ptr<DictionaryMemo> dictionary_memo_; - - MemoryPool* pool_; - - int64_t position_; - bool started_; - - std::vector<FileBlock> dictionaries_; - std::vector<FileBlock> record_batches_; -}; - -class ARROW_EXPORT StreamReader { - public: - ~StreamReader(); - - // Open an stream. - static Status Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<StreamReader>* reader); - - std::shared_ptr<Schema> schema() const; - - // Returned batch is nullptr when end of stream reached - Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch); - - private: - StreamReader(); - - class ARROW_NO_EXPORT StreamReaderImpl; - std::unique_ptr<StreamReaderImpl> impl_; -}; - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_STREAM_H http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc new file mode 100644 index 0000000..975b0d1 --- /dev/null +++ b/cpp/src/arrow/ipc/writer.cc @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/writer.h" + +#include <cstdint> +#include <cstring> +#include <sstream> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata-internal.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/memory_pool.h" +#include "arrow/schema.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace ipc { + +// ---------------------------------------------------------------------- +// Stream writer implementation + +class StreamWriter::StreamWriterImpl { + public: + StreamWriterImpl() + : dictionary_memo_(std::make_shared<DictionaryMemo>()), + pool_(default_memory_pool()), + position_(-1), + started_(false) {} + + virtual ~StreamWriterImpl() = default; + + Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) { + sink_ = sink; + schema_ = schema; + return UpdatePosition(); + } + + virtual Status Start() { + std::shared_ptr<Buffer> schema_fb; + RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), &schema_fb)); + + int32_t flatbuffer_size = schema_fb->size(); + RETURN_NOT_OK( + Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); + + // Write the flatbuffer + RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); + + // If there are any dictionaries, write them as the next messages + RETURN_NOT_OK(WriteDictionaries()); + + started_ = true; + return Status::OK(); + } + + virtual Status Close() { + // Write the schema if not already written + // User is responsible for closing the OutputStream + return CheckStarted(); + } + + Status CheckStarted() { + if (!started_) { return Start(); } + return Status::OK(); + } + + Status UpdatePosition() { return sink_->Tell(&position_); } + + Status WriteDictionaries() { + const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary(); + + dictionaries_.resize(id_to_dictionary.size()); + + // TODO(wesm): does sorting by id yield any benefit? + int dict_index = 0; + for (const auto& entry : id_to_dictionary) { + FileBlock* block = &dictionaries_[dict_index++]; + + block->offset = position_; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; + RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_, + &block->metadata_length, &block->body_length, pool_)); + RETURN_NOT_OK(UpdatePosition()); + DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes"; + } + + return Status::OK(); + } + + Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block) { + RETURN_NOT_OK(CheckStarted()); + + block->offset = position_; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_, + &block->metadata_length, &block->body_length, pool_)); + RETURN_NOT_OK(UpdatePosition()); + + DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; + + return Status::OK(); + } + + Status WriteRecordBatch(const RecordBatch& batch) { + // Push an empty FileBlock. Can be written in the footer later + record_batches_.emplace_back(0, 0, 0); + return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]); + } + + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte boundaries. + Status Align() { + int64_t remainder = PaddedLength(position_) - position_; + if (remainder > 0) { return Write(kPaddingBytes, remainder); } + return Status::OK(); + } + + // Write data and update position + Status Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(sink_->Write(data, nbytes)); + position_ += nbytes; + return Status::OK(); + } + + // Write and align + Status WriteAligned(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(Write(data, nbytes)); + return Align(); + } + + void set_memory_pool(MemoryPool* pool) { pool_ = pool; } + + protected: + io::OutputStream* sink_; + std::shared_ptr<Schema> schema_; + + // When writing out the schema, we keep track of all the dictionaries we + // encounter, as they must be written out first in the stream + std::shared_ptr<DictionaryMemo> dictionary_memo_; + + MemoryPool* pool_; + + int64_t position_; + bool started_; + + std::vector<FileBlock> dictionaries_; + std::vector<FileBlock> record_batches_; +}; + +StreamWriter::StreamWriter() { + impl_.reset(new StreamWriterImpl()); +} + +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { + return impl_->WriteRecordBatch(batch); +} + +void StreamWriter::set_memory_pool(MemoryPool* pool) { + impl_->set_memory_pool(pool); +} + +Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<StreamWriter>* out) { + // ctor is private + *out = std::shared_ptr<StreamWriter>(new StreamWriter()); + return (*out)->impl_->Open(sink, schema); +} + +Status StreamWriter::Close() { + return impl_->Close(); +} + +// ---------------------------------------------------------------------- +// File writer implementation + +static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> +FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { + std::vector<flatbuf::Block> fb_blocks; + + for (const FileBlock& block : blocks) { + fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); + } + + return fbb.CreateVectorOfStructs(fb_blocks); +} + +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out) { + FBB fbb; + + flatbuffers::Offset<flatbuf::Schema> fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); + + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); + auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); + + auto footer = flatbuf::CreateFooter( + fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + + fbb.Finish(footer); + + int32_t size = fbb.GetSize(); + + return out->Write(fbb.GetBufferPointer(), size); +} + +class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { + public: + using BASE = StreamWriter::StreamWriterImpl; + + Status Start() override { + RETURN_NOT_OK(WriteAligned( + reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); + + // We write the schema at the start of the file (and the end). This also + // writes all the dictionaries at the beginning of the file + return BASE::Start(); + } + + Status Close() override { + // Write metadata + int64_t initial_position = position_; + RETURN_NOT_OK(WriteFileFooter( + *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), sink_)); + RETURN_NOT_OK(UpdatePosition()); + + // Write footer length + int32_t footer_length = position_ - initial_position; + + if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); } + + RETURN_NOT_OK( + Write(reinterpret_cast<const uint8_t*>(&footer_length), sizeof(int32_t))); + + // Write magic bytes to end file + return Write( + reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)); + } +}; + +FileWriter::FileWriter() { + impl_.reset(new FileWriterImpl()); +} + +Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<FileWriter>* out) { + *out = std::shared_ptr<FileWriter>(new FileWriter()); // ctor is private + return (*out)->impl_->Open(sink, schema); +} + +Status FileWriter::WriteRecordBatch(const RecordBatch& batch) { + return impl_->WriteRecordBatch(batch); +} + +Status FileWriter::Close() { + return impl_->Close(); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h new file mode 100644 index 0000000..7aff71e --- /dev/null +++ b/cpp/src/arrow/ipc/writer.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow streaming binary format + +#ifndef ARROW_IPC_STREAM_H +#define ARROW_IPC_STREAM_H + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/ipc/metadata.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +struct Field; +class MemoryPool; +class RecordBatch; +class Schema; +class Status; + +namespace io { + +class OutputStream; + +} // namespace io + +namespace ipc { + +class ARROW_EXPORT StreamWriter { + public: + virtual ~StreamWriter() = default; + + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<StreamWriter>* out); + + virtual Status WriteRecordBatch(const RecordBatch& batch); + + /// Perform any logic necessary to finish the stream. User is responsible for + /// closing the actual OutputStream + virtual Status Close(); + + // In some cases, writing may require memory allocation. We use the default + // memory pool, but provide the option to override + void set_memory_pool(MemoryPool* pool); + + protected: + StreamWriter(); + class ARROW_NO_EXPORT StreamWriterImpl; + std::unique_ptr<StreamWriterImpl> impl_; +}; + +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out); + +class ARROW_EXPORT FileWriter : public StreamWriter { + public: + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<FileWriter>* out); + + Status WriteRecordBatch(const RecordBatch& batch) override; + Status Close() override; + + private: + FileWriter(); + class ARROW_NO_EXPORT FileWriterImpl; + std::unique_ptr<FileWriterImpl> impl_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_STREAM_H http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 19b1e19..8d9afcc 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -68,26 +68,4 @@ if (ARROW_BUILD_BENCHMARKS) endif() endif() -if (ARROW_IPC AND ARROW_BUILD_UTILITIES) - set(UTIL_LINK_LIBS - arrow_ipc_static - arrow_io_static - arrow_static - boost_filesystem_static - boost_system_static - dl) - - if (NOT APPLE) - set(UTIL_LINK_LIBS - ${UTIL_LINK_LIBS} - boost_filesystem_static - boost_system_static) - endif() - - add_executable(file-to-stream file-to-stream.cc) - target_link_libraries(file-to-stream ${UTIL_LINK_LIBS}) - add_executable(stream-to-file stream-to-file.cc) - target_link_libraries(stream-to-file ${UTIL_LINK_LIBS}) -endif() - ADD_ARROW_TEST(bit-util-test) http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/file-to-stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/file-to-stream.cc b/cpp/src/arrow/util/file-to-stream.cc deleted file mode 100644 index 7daf263..0000000 --- a/cpp/src/arrow/util/file-to-stream.cc +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/io/file.h" -#include "arrow/ipc/file.h" -#include "arrow/ipc/stream.h" -#include "arrow/status.h" -#include <iostream> - -#include "arrow/util/io-util.h" - -namespace arrow { - -// Reads a file on the file system and prints to stdout the stream version of it. -Status ConvertToStream(const char* path) { - std::shared_ptr<io::ReadableFile> in_file; - std::shared_ptr<ipc::FileReader> reader; - - RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file)); - RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); - - io::StdoutStream sink; - std::shared_ptr<ipc::StreamWriter> writer; - RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer)); - for (int i = 0; i < reader->num_record_batches(); ++i) { - std::shared_ptr<RecordBatch> chunk; - RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); - RETURN_NOT_OK(writer->WriteRecordBatch(*chunk)); - } - return writer->Close(); -} - -} // namespace arrow - -int main(int argc, char** argv) { - if (argc != 2) { - std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl; - return 1; - } - arrow::Status status = arrow::ConvertToStream(argv[1]); - if (!status.ok()) { - std::cerr << "Could not convert to stream: " << status.ToString() << std::endl; - return 1; - } - return 0; -} http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/stream-to-file.cc b/cpp/src/arrow/util/stream-to-file.cc deleted file mode 100644 index 393b07d..0000000 --- a/cpp/src/arrow/util/stream-to-file.cc +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/stream.h" -#include "arrow/io/file.h" -#include "arrow/ipc/file.h" -#include "arrow/status.h" -#include <iostream> - -#include "arrow/util/io-util.h" - -namespace arrow { - -// Converts a stream from stdin to a file written to standard out. -// A typical usage would be: -// $ <program that produces streaming output> | stream-to-file > file.arrow -Status ConvertToFile() { - std::shared_ptr<io::InputStream> input(new io::StdinStream); - std::shared_ptr<ipc::StreamReader> reader; - RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader)); - - io::StdoutStream sink; - std::shared_ptr<ipc::FileWriter> writer; - RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer)); - - std::shared_ptr<RecordBatch> batch; - while (true) { - RETURN_NOT_OK(reader->GetNextRecordBatch(&batch)); - if (batch == nullptr) break; - RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); - } - return writer->Close(); -} - -} // namespace arrow - -int main(int argc, char** argv) { - arrow::Status status = arrow::ConvertToFile(); - if (!status.ok()) { - std::cerr << "Could not convert to file: " << status.ToString() << std::endl; - return 1; - } - return 0; -} http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index afc7dbd..10c70a9 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -23,7 +23,7 @@ from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream, ReadableFileInterface) -cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil: +cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: cdef cppclass CStreamWriter " arrow::ipc::StreamWriter": @staticmethod @@ -43,9 +43,6 @@ cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil: CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) - -cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: - cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
