Repository: arrow Updated Branches: refs/heads/master 99ff24089 -> 5739e04b3
ARROW-1008: [C++] Add abstract stream writer and reader C++ APIs. Give clearer names to IPC reader/writer classes The main motivation for this patch was to make `StreamReader` and `StreamWriter` abstract, so that other implementations can be created. I would also like to add the option for asynchronous reading and writing. I also added a CMake option `ARROW_NO_DEPRECATED_API` for more graceful name deprecations. @kou do you think these names for the IPC classes are more clear? Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #679 from wesm/ARROW-1008 and squashes the following commits: d7b7c9ce [Wes McKinney] Add missing dtors for pimpl pattern a797ee3e [Wes McKinney] Fix glib 04fa2854 [Wes McKinney] Feedback on ipc reader/writer names. Add open_stream/open_file Python APIs 22346d47 [Wes McKinney] Fix unit tests 10837a65 [Wes McKinney] Add abstract stream writer and reader C++ APIs. Rename record batch stream reader and writer classes for better clarity Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5739e04b Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5739e04b Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5739e04b Branch: refs/heads/master Commit: 5739e04b35aeb5be9df7e9aace866ba48ecbac8a Parents: 99ff240 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sun May 14 08:55:26 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sun May 14 08:55:26 2017 -0400 ---------------------------------------------------------------------- c_glib/arrow-glib/stream-reader.cpp | 20 ++-- c_glib/arrow-glib/stream-reader.h | 2 +- c_glib/arrow-glib/stream-reader.hpp | 4 +- c_glib/arrow-glib/writer.cpp | 16 +-- c_glib/arrow-glib/writer.h | 2 +- c_glib/arrow-glib/writer.hpp | 8 +- ci/travis_before_script_cpp.sh | 2 + cpp/CMakeLists.txt | 8 ++ cpp/src/arrow/ipc/file-to-stream.cc | 12 ++- cpp/src/arrow/ipc/ipc-read-write-test.cc | 25 ++--- cpp/src/arrow/ipc/json-integration-test.cc | 13 +-- cpp/src/arrow/ipc/reader.cc | 54 +++++----- cpp/src/arrow/ipc/reader.h | 107 ++++++++++++++----- cpp/src/arrow/ipc/stream-to-file.cc | 12 ++- cpp/src/arrow/ipc/writer.cc | 70 +++++++------ cpp/src/arrow/ipc/writer.h | 132 ++++++++++++++++-------- python/doc/source/api.rst | 10 +- python/doc/source/ipc.rst | 23 +++-- python/pyarrow/__init__.py | 5 +- python/pyarrow/includes/libarrow.pxd | 38 ++++--- python/pyarrow/io.pxi | 48 +++++---- python/pyarrow/ipc.py | 46 ++++++++- python/pyarrow/tests/test_ipc.py | 20 ++-- 23 files changed, 433 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.cpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/stream-reader.cpp b/c_glib/arrow-glib/stream-reader.cpp index cc18cd8..19c36c2 100644 --- a/c_glib/arrow-glib/stream-reader.cpp +++ b/c_glib/arrow-glib/stream-reader.cpp @@ -43,7 +43,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamReaderPrivate_ { - std::shared_ptr<arrow::ipc::StreamReader> stream_reader; + std::shared_ptr<arrow::ipc::RecordBatchStreamReader> stream_reader; } GArrowStreamReaderPrivate; enum { @@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_READER: priv->stream_reader = - *static_cast<std::shared_ptr<arrow::ipc::StreamReader> *>(g_value_get_pointer(value)); + *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass) gobject_class->get_property = garrow_stream_reader_get_property; spec = g_param_spec_pointer("stream-reader", - "ipc::StreamReader", - "The raw std::shared<arrow::ipc::StreamReader> *", + "ipc::RecordBatchStreamReader", + "The raw std::shared<arrow::ipc::RecordBatchStreamReader> *", static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec); @@ -143,10 +143,10 @@ GArrowStreamReader * garrow_stream_reader_new(GArrowInputStream *stream, GError **error) { - std::shared_ptr<arrow::ipc::StreamReader> arrow_stream_reader; + std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_stream_reader; auto status = - arrow::ipc::StreamReader::Open(garrow_input_stream_get_raw(stream), - &arrow_stream_reader); + arrow::ipc::RecordBatchStreamReader::Open(garrow_input_stream_get_raw(stream), + &arrow_stream_reader); if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) { return garrow_stream_reader_new_raw(&arrow_stream_reader); } else { @@ -179,7 +179,7 @@ garrow_stream_reader_get_schema(GArrowStreamReader *stream_reader) */ GArrowRecordBatch * garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader, - GError **error) + GError **error) { auto arrow_stream_reader = garrow_stream_reader_get_raw(stream_reader); @@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader, G_END_DECLS GArrowStreamReader * -garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader) +garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader) { auto stream_reader = GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER, @@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_st return stream_reader; } -std::shared_ptr<arrow::ipc::StreamReader> +std::shared_ptr<arrow::ipc::RecordBatchStreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader) { GArrowStreamReaderPrivate *priv; http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.h ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/stream-reader.h b/c_glib/arrow-glib/stream-reader.h index 2ea2c26..f6cdaea 100644 --- a/c_glib/arrow-glib/stream-reader.h +++ b/c_glib/arrow-glib/stream-reader.h @@ -55,7 +55,7 @@ typedef struct _GArrowStreamReaderClass GArrowStreamReaderClass; /** * GArrowStreamReader: * - * It wraps `arrow::ipc::StreamReader`. + * It wraps `arrow::ipc::InputStreamReader`. */ struct _GArrowStreamReader { http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.hpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/stream-reader.hpp b/c_glib/arrow-glib/stream-reader.hpp index ca8e689..5191b4e 100644 --- a/c_glib/arrow-glib/stream-reader.hpp +++ b/c_glib/arrow-glib/stream-reader.hpp @@ -24,5 +24,5 @@ #include <arrow-glib/stream-reader.h> -GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader); -std::shared_ptr<arrow::ipc::StreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); +GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader); +std::shared_ptr<arrow::ipc::RecordBatchStreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.cpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp index 625a19e..092993b 100644 --- a/c_glib/arrow-glib/writer.cpp +++ b/c_glib/arrow-glib/writer.cpp @@ -47,7 +47,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamWriterPrivate_ { - std::shared_ptr<arrow::ipc::StreamWriter> stream_writer; + std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> stream_writer; } GArrowStreamWriterPrivate; enum { @@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_WRITER: priv->stream_writer = - *static_cast<std::shared_ptr<arrow::ipc::StreamWriter> *>(g_value_get_pointer(value)); + *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass) gobject_class->get_property = garrow_stream_writer_get_property; spec = g_param_spec_pointer("stream-writer", - "ipc::StreamWriter", - "The raw std::shared<arrow::ipc::StreamWriter> *", + "ipc::RecordBatchStreamWriter", + "The raw std::shared<arrow::ipc::RecordBatchStreamWriter> *", static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec); @@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink, GArrowSchema *schema, GError **error) { - std::shared_ptr<arrow::ipc::StreamWriter> arrow_stream_writer; + std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_stream_writer; auto status = - arrow::ipc::StreamWriter::Open(garrow_output_stream_get_raw(sink).get(), - garrow_schema_get_raw(schema), - &arrow_stream_writer); + arrow::ipc::RecordBatchStreamWriter::Open(garrow_output_stream_get_raw(sink).get(), + garrow_schema_get_raw(schema), + &arrow_stream_writer); if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) { return garrow_stream_writer_new_raw(&arrow_stream_writer); } else { http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.h ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h index 2aaa776..2f8e90c 100644 --- a/c_glib/arrow-glib/writer.h +++ b/c_glib/arrow-glib/writer.h @@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass GArrowStreamWriterClass; /** * GArrowStreamWriter: * - * It wraps `arrow::ipc::StreamWriter`. + * It wraps `arrow::ipc::RecordBatchStreamWriter`. */ struct _GArrowStreamWriter { http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.hpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/writer.hpp b/c_glib/arrow-glib/writer.hpp index 199f205..47f5e68 100644 --- a/c_glib/arrow-glib/writer.hpp +++ b/c_glib/arrow-glib/writer.hpp @@ -24,8 +24,8 @@ #include <arrow-glib/writer.h> -GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::StreamWriter> *arrow_stream_writer); -std::shared_ptr<arrow::ipc::StreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer); +GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_stream_writer); +std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer); -GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::FileWriter> *arrow_file_writer); -arrow::ipc::FileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer); +GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_file_writer); +arrow::ipc::RecordBatchFileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer); http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/ci/travis_before_script_cpp.sh ---------------------------------------------------------------------- diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 3f9f67c..7d4ecb7 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -38,10 +38,12 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then cmake -DARROW_TEST_MEMCHECK=on \ $CMAKE_COMMON_FLAGS \ -DARROW_CXXFLAGS="-Wconversion -Werror" \ + -DARROW_NO_DEPRECATED_API=on \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ -DARROW_CXXFLAGS=-Werror \ + -DARROW_NO_DEPRECATED_API=on \ $ARROW_CPP_DIR fi http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6b2ceec..0ad7ef5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -89,6 +89,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") "Build the Arrow micro benchmarks" OFF) + option(ARROW_NO_DEPRECATED_API + "Exclude deprecated APIs from build" + OFF) + option(ARROW_IPC "Build the Arrow IPC extensions" ON) @@ -154,6 +158,10 @@ include(BuildUtils) include(SetupCxxFlags) +if (ARROW_NO_DEPRECATED_API) + add_definitions(-DARROW_NO_DEPRECATED_API) +endif() + # Add common flags set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_COMMON_FLAGS}") set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}") http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/file-to-stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc index 8161b19..39c720c 100644 --- a/cpp/src/arrow/ipc/file-to-stream.cc +++ b/cpp/src/arrow/ipc/file-to-stream.cc @@ -24,18 +24,19 @@ #include "arrow/util/io-util.h" namespace arrow { +namespace ipc { // Reads a file on the file system and prints to stdout the stream version of it. Status ConvertToStream(const char* path) { std::shared_ptr<io::ReadableFile> in_file; - std::shared_ptr<ipc::FileReader> reader; + std::shared_ptr<RecordBatchFileReader> reader; RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file)); - RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader)); io::StdoutStream sink; - std::shared_ptr<ipc::StreamWriter> writer; - RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr<RecordBatchStreamWriter> writer; + RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> chunk; RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); @@ -44,6 +45,7 @@ Status ConvertToStream(const char* path) { return writer->Close(); } +} // namespace ipc } // namespace arrow int main(int argc, char** argv) { @@ -51,7 +53,7 @@ int main(int argc, char** argv) { std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl; return 1; } - arrow::Status status = arrow::ConvertToStream(argv[1]); + arrow::Status status = arrow::ipc::ConvertToStream(argv[1]); if (!status.ok()) { std::cerr << "Could not convert to stream: " << status.ToString() << std::endl; return 1; http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/ipc-read-write-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index b4a88b5..c99816c 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture { if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); } RETURN_NOT_OK(mmap_->Seek(0)); - std::shared_ptr<FileWriter> file_writer; - RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); + std::shared_ptr<RecordBatchFileWriter> file_writer; + RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true)); RETURN_NOT_OK(file_writer->Close()); int64_t offset; RETURN_NOT_OK(mmap_->Tell(&offset)); - std::shared_ptr<FileReader> file_reader; - RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader)); + std::shared_ptr<RecordBatchFileReader> file_reader; + RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader)); return file_reader->GetRecordBatch(0, result); } @@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { // Write the file - std::shared_ptr<FileWriter> writer; - RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); + std::shared_ptr<RecordBatchFileWriter> writer; + RETURN_NOT_OK( + RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); const int num_batches = static_cast<int>(in_batches.size()); @@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { // Open the file auto buf_reader = std::make_shared<io::BufferReader>(buffer_); - std::shared_ptr<FileReader> reader; - RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader)); + std::shared_ptr<RecordBatchFileReader> reader; + RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader)); EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { @@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> { Status RoundTripHelper( const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) { // Write the file - std::shared_ptr<StreamWriter> writer; - RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer)); + std::shared_ptr<RecordBatchStreamWriter> writer; + RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer)); int num_batches = 5; for (int i = 0; i < num_batches; ++i) { RETURN_NOT_OK(writer->WriteRecordBatch(batch)); @@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> { // Open the file auto buf_reader = std::make_shared<io::BufferReader>(buffer_); - std::shared_ptr<StreamReader> reader; - RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader)); + std::shared_ptr<RecordBatchStreamReader> reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader)); std::shared_ptr<RecordBatch> chunk; while (true) { http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index aa95500..424755a 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -76,8 +76,9 @@ static Status ConvertJsonToArrow( std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; } - std::shared_ptr<ipc::FileWriter> writer; - RETURN_NOT_OK(ipc::FileWriter::Open(out_file.get(), reader->schema(), &writer)); + std::shared_ptr<ipc::RecordBatchFileWriter> writer; + RETURN_NOT_OK( + ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> batch; @@ -96,8 +97,8 @@ static Status ConvertArrowToJson( RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file)); RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file)); - std::shared_ptr<ipc::FileReader> reader; - RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); + std::shared_ptr<ipc::RecordBatchFileReader> reader; + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader)); if (FLAGS_verbose) { std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; @@ -137,8 +138,8 @@ static Status ValidateArrowVsJson( std::shared_ptr<io::ReadableFile> arrow_file; RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file)); - std::shared_ptr<ipc::FileReader> arrow_reader; - RETURN_NOT_OK(ipc::FileReader::Open(arrow_file, &arrow_reader)); + std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader; + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader)); auto json_schema = json_reader->schema(); auto arrow_schema = arrow_reader->schema(); http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index aea4c9c..2b7b90f 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona } // ---------------------------------------------------------------------- -// StreamReader implementation +// RecordBatchStreamReader implementation static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); @@ -176,10 +176,12 @@ static inline std::string message_type_name(Message::Type type) { return "unknown"; } -class StreamReader::StreamReaderImpl { +RecordBatchReader::~RecordBatchReader() {} + +class RecordBatchStreamReader::RecordBatchStreamReaderImpl { public: - StreamReaderImpl() {} - ~StreamReaderImpl() {} + RecordBatchStreamReaderImpl() {} + ~RecordBatchStreamReaderImpl() {} Status Open(const std::shared_ptr<io::InputStream>& stream) { stream_ = stream; @@ -267,33 +269,33 @@ class StreamReader::StreamReaderImpl { std::shared_ptr<Schema> schema_; }; -StreamReader::StreamReader() { - impl_.reset(new StreamReaderImpl()); +RecordBatchStreamReader::RecordBatchStreamReader() { + impl_.reset(new RecordBatchStreamReaderImpl()); } -StreamReader::~StreamReader() {} +RecordBatchStreamReader::~RecordBatchStreamReader() {} -Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<StreamReader>* reader) { +Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<RecordBatchStreamReader>* reader) { // Private ctor - *reader = std::shared_ptr<StreamReader>(new StreamReader()); + *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader()); return (*reader)->impl_->Open(stream); } -std::shared_ptr<Schema> StreamReader::schema() const { +std::shared_ptr<Schema> RecordBatchStreamReader::schema() const { return impl_->schema(); } -Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { +Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { return impl_->GetNextRecordBatch(batch); } // ---------------------------------------------------------------------- // Reader implementation -class FileReader::FileReaderImpl { +class RecordBatchFileReader::RecordBatchFileReaderImpl { public: - FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); } + RecordBatchFileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); } Status ReadFooter() { int magic_size = static_cast<int>(strlen(kArrowMagicBytes)); @@ -432,38 +434,38 @@ class FileReader::FileReaderImpl { std::shared_ptr<Schema> schema_; }; -FileReader::FileReader() { - impl_.reset(new FileReaderImpl()); +RecordBatchFileReader::RecordBatchFileReader() { + impl_.reset(new RecordBatchFileReaderImpl()); } -FileReader::~FileReader() {} +RecordBatchFileReader::~RecordBatchFileReader() {} -Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, - std::shared_ptr<FileReader>* reader) { +Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, + std::shared_ptr<RecordBatchFileReader>* reader) { int64_t footer_offset; RETURN_NOT_OK(file->GetSize(&footer_offset)); return Open(file, footer_offset, reader); } -Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, - int64_t footer_offset, std::shared_ptr<FileReader>* reader) { - *reader = std::shared_ptr<FileReader>(new FileReader()); +Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, + int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) { + *reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader()); return (*reader)->impl_->Open(file, footer_offset); } -std::shared_ptr<Schema> FileReader::schema() const { +std::shared_ptr<Schema> RecordBatchFileReader::schema() const { return impl_->schema(); } -int FileReader::num_record_batches() const { +int RecordBatchFileReader::num_record_batches() const { return impl_->num_record_batches(); } -MetadataVersion FileReader::version() const { +MetadataVersion RecordBatchFileReader::version() const { return impl_->version(); } -Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { +Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { return impl_->GetRecordBatch(i, batch); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 1972446..dd29a36 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -44,29 +44,50 @@ class RandomAccessFile; namespace ipc { -class ARROW_EXPORT StreamReader { +/// \brief Abstract interface for reading stream of record batches +class ARROW_EXPORT RecordBatchReader { public: - ~StreamReader(); + virtual ~RecordBatchReader(); - // Open an stream. - static Status Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<StreamReader>* reader); + /// \return the shared schema of the record batches in the stream + virtual std::shared_ptr<Schema> schema() const = 0; - std::shared_ptr<Schema> schema() const; + /// Read the next record batch in the stream. Return nullptr for batch when + /// reaching end of stream + /// + /// \param(out) batch the next loaded batch, nullptr at end of stream + /// \return Status + virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0; +}; + +/// \class RecordBatchStreamReader +/// \brief Synchronous batch stream reader that reads from io::InputStream +class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { + public: + virtual ~RecordBatchStreamReader(); - // Returned batch is nullptr when end of stream reached - Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch); + /// Create batch reader from InputStream + /// + /// \param(in) stream an input stream instance + /// \param(out) reader the created reader object + /// \return Status + static Status Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<RecordBatchStreamReader>* reader); + + std::shared_ptr<Schema> schema() const override; + Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override; private: - StreamReader(); + RecordBatchStreamReader(); - class ARROW_NO_EXPORT StreamReaderImpl; - std::unique_ptr<StreamReaderImpl> impl_; + class ARROW_NO_EXPORT RecordBatchStreamReaderImpl; + std::unique_ptr<RecordBatchStreamReaderImpl> impl_; }; -class ARROW_EXPORT FileReader { +/// \brief Reads the record batch file format +class ARROW_EXPORT RecordBatchFileReader { public: - ~FileReader(); + ~RecordBatchFileReader(); // Open a file-like object that is assumed to be self-contained; i.e., the // end of the file interface is the end of the Arrow file. Note that there @@ -74,7 +95,7 @@ class ARROW_EXPORT FileReader { // need only locate the end of the Arrow file stream to discover the metadata // and then proceed to read the data into memory. static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, - std::shared_ptr<FileReader>* reader); + std::shared_ptr<RecordBatchFileReader>* reader); // If the file is embedded within some larger file or memory region, you can // pass the absolute memory offset to the end of the file (which contains the @@ -84,46 +105,80 @@ class ARROW_EXPORT FileReader { // @param file: the data source // @param footer_offset: the position of the end of the Arrow "file" static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, - int64_t footer_offset, std::shared_ptr<FileReader>* reader); + int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader); /// The schema includes any dictionaries std::shared_ptr<Schema> schema() const; + /// Returns number of record batches in the file int num_record_batches() const; + /// Returns MetadataVersion in the file metadata MetadataVersion version() const; - // Read a record batch from the file. Does not copy memory if the input - // source supports zero-copy. - // - // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an - // "always copy" option) + /// Read a record batch from the file. Does not copy memory if the input + /// source supports zero-copy. + /// + /// \param(in) i the index of the record batch to return + /// \param(out) batch the read batch + /// \return Status Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); private: - FileReader(); + RecordBatchFileReader(); - class ARROW_NO_EXPORT FileReaderImpl; - std::unique_ptr<FileReaderImpl> impl_; + class ARROW_NO_EXPORT RecordBatchFileReaderImpl; + std::unique_ptr<RecordBatchFileReaderImpl> impl_; }; -// Generic read functionsh; does not copy data if the input supports zero copy reads +// Generic read functions; does not copy data if the input supports zero copy reads + +/// Read record batch from file given metadata and schema +/// +/// \param(in) metadata a Message containing the record batch metadata +/// \param(in) schema the record batch schema +/// \param(in) file a random access file +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); +/// Read record batch from file given metadata and schema +/// +/// \param(in) metadata a Message containing the record batch metadata +/// \param(in) schema the record batch schema +/// \param(in) file a random access file +/// \param(in) max_recursion_depth the maximum permitted nesting depth +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); -/// Read encapsulated message and RecordBatch +/// Read record batch as encapsulated IPC message with metadata size prefix and +/// header +/// +/// \param(in) schema the record batch schema +/// \param(in) offset the file location of the start of the message +/// \param(in) file the file where the batch is located +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); -/// EXPERIMENTAL: Read arrow::Tensor from a contiguous message +/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file +/// +/// \param(in) offset the file location of the start of the message +/// \param(in) file the file where the batch is located +/// \param(out) out the read tensor Status ARROW_EXPORT ReadTensor( int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out); +/// Backwards-compatibility for Arrow < 0.4.0 +/// +#ifndef ARROW_NO_DEPRECATED_API +using StreamReader = RecordBatchReader; +using FileReader = RecordBatchFileReader; +#endif + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index ec0ac43..b942054 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -24,18 +24,19 @@ #include "arrow/util/io-util.h" namespace arrow { +namespace ipc { // Converts a stream from stdin to a file written to standard out. // A typical usage would be: // $ <program that produces streaming output> | stream-to-file > file.arrow Status ConvertToFile() { std::shared_ptr<io::InputStream> input(new io::StdinStream); - std::shared_ptr<ipc::StreamReader> reader; - RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader)); + std::shared_ptr<RecordBatchStreamReader> reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader)); io::StdoutStream sink; - std::shared_ptr<ipc::FileWriter> writer; - RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr<RecordBatchFileWriter> writer; + RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer)); std::shared_ptr<RecordBatch> batch; while (true) { @@ -46,10 +47,11 @@ Status ConvertToFile() { return writer->Close(); } +} // namespace ipc } // namespace arrow int main(int argc, char** argv) { - arrow::Status status = arrow::ConvertToFile(); + arrow::Status status = arrow::ipc::ConvertToFile(); if (!status.ok()) { std::cerr << "Could not convert to file: " << status.ToString() << std::endl; return 1; http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 78d6b9e..ced0710 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -88,9 +88,9 @@ static inline bool NeedTruncate( return offset != 0 || min_length < buffer->size(); } -class RecordBatchWriter : public ArrayVisitor { +class RecordBatchSerializer : public ArrayVisitor { public: - RecordBatchWriter(MemoryPool* pool, int64_t buffer_start_offset, + RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth, bool allow_64bit) : pool_(pool), max_recursion_depth_(max_recursion_depth), @@ -99,7 +99,7 @@ class RecordBatchWriter : public ArrayVisitor { DCHECK_GT(max_recursion_depth, 0); } - virtual ~RecordBatchWriter() = default; + virtual ~RecordBatchSerializer() = default; Status VisitArray(const Array& arr) { if (max_recursion_depth_ <= 0) { @@ -480,9 +480,9 @@ class RecordBatchWriter : public ArrayVisitor { bool allow_64bit_; }; -class DictionaryWriter : public RecordBatchWriter { +class DictionaryWriter : public RecordBatchSerializer { public: - using RecordBatchWriter::RecordBatchWriter; + using RecordBatchSerializer::RecordBatchSerializer; Status WriteMetadataMessage( int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { @@ -500,7 +500,7 @@ class DictionaryWriter : public RecordBatchWriter { auto schema = std::make_shared<Schema>(fields); RecordBatch batch(schema, dictionary->length(), {dictionary}); - return RecordBatchWriter::Write(batch, dst, metadata_length, body_length); + return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); } private: @@ -521,7 +521,8 @@ Status AlignStreamPosition(io::OutputStream* stream) { Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool, int max_recursion_depth, bool allow_64bit) { - RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth, allow_64bit); + RecordBatchSerializer writer( + pool, buffer_start_offset, max_recursion_depth, allow_64bit); return writer.Write(batch, dst, metadata_length, body_length); } @@ -581,17 +582,21 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) { } // ---------------------------------------------------------------------- + +RecordBatchWriter::~RecordBatchWriter() {} + +// ---------------------------------------------------------------------- // Stream writer implementation -class StreamWriter::StreamWriterImpl { +class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { public: - StreamWriterImpl() + RecordBatchStreamWriterImpl() : dictionary_memo_(std::make_shared<DictionaryMemo>()), pool_(default_memory_pool()), position_(-1), started_(false) {} - virtual ~StreamWriterImpl() = default; + virtual ~RecordBatchStreamWriterImpl() = default; Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) { sink_ = sink; @@ -721,37 +726,40 @@ class StreamWriter::StreamWriterImpl { std::vector<FileBlock> record_batches_; }; -StreamWriter::StreamWriter() { - impl_.reset(new StreamWriterImpl()); +RecordBatchStreamWriter::RecordBatchStreamWriter() { + impl_.reset(new RecordBatchStreamWriterImpl()); } -Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +RecordBatchStreamWriter::~RecordBatchStreamWriter() {} + +Status RecordBatchStreamWriter::WriteRecordBatch( + const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -StreamWriter::~StreamWriter() {} - -void StreamWriter::set_memory_pool(MemoryPool* pool) { +void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) { impl_->set_memory_pool(pool); } -Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<StreamWriter>* out) { +Status RecordBatchStreamWriter::Open(io::OutputStream* sink, + const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatchStreamWriter>* out) { // ctor is private - *out = std::shared_ptr<StreamWriter>(new StreamWriter()); + *out = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter()); return (*out)->impl_->Open(sink, schema); } -Status StreamWriter::Close() { +Status RecordBatchStreamWriter::Close() { return impl_->Close(); } // ---------------------------------------------------------------------- // File writer implementation -class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { +class RecordBatchFileWriter::RecordBatchFileWriterImpl + : public RecordBatchStreamWriter::RecordBatchStreamWriterImpl { public: - using BASE = StreamWriter::StreamWriterImpl; + using BASE = RecordBatchStreamWriter::RecordBatchStreamWriterImpl; Status Start() override { RETURN_NOT_OK(WriteAligned( @@ -783,23 +791,25 @@ class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { } }; -FileWriter::FileWriter() { - impl_.reset(new FileWriterImpl()); +RecordBatchFileWriter::RecordBatchFileWriter() { + impl_.reset(new RecordBatchFileWriterImpl()); } -FileWriter::~FileWriter() {} +RecordBatchFileWriter::~RecordBatchFileWriter() {} -Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out) { - *out = std::shared_ptr<FileWriter>(new FileWriter()); // ctor is private +Status RecordBatchFileWriter::Open(io::OutputStream* sink, + const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatchFileWriter>* out) { + *out = std::shared_ptr<RecordBatchFileWriter>( + new RecordBatchFileWriter()); // ctor is private return (*out)->impl_->Open(sink, schema); } -Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +Status RecordBatchFileWriter::WriteRecordBatch( + const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -Status FileWriter::Close() { +Status RecordBatchFileWriter::Close() { return impl_->Close(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index b71becb..899a1b2 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -46,6 +46,84 @@ class OutputStream; namespace ipc { +/// \class RecordBatchWriter +/// \brief Abstract interface for writing a stream of record batches +class ARROW_EXPORT RecordBatchWriter { + public: + virtual ~RecordBatchWriter(); + + /// Write a record batch to the stream + /// + /// \param allow_64bit boolean permitting field lengths exceeding INT32_MAX + /// \return Status indicate success or failure + virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0; + + /// Perform any logic necessary to finish the stream + /// + /// \return Status indicate success or failure + virtual Status Close() = 0; + + /// In some cases, writing may require memory allocation. We use the default + /// memory pool, but provide the option to override + /// + /// \param pool the memory pool to use for required allocations + virtual void set_memory_pool(MemoryPool* pool) = 0; +}; + +/// \class RecordBatchStreamWriter +/// \brief Synchronous batch stream writer that writes the Arrow streaming +/// format +class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { + public: + virtual ~RecordBatchStreamWriter(); + + /// Create a new writer from stream sink and schema. User is responsible for + /// closing the actual OutputStream. + /// + /// \param(in) sink output stream to write to + /// \param(in) schema the schema of the record batches to be written + /// \param(out) out the created stream writer + /// \return Status indicating success or failure + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatchStreamWriter>* out); + + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; + Status Close() override; + void set_memory_pool(MemoryPool* pool) override; + + protected: + RecordBatchStreamWriter(); + class ARROW_NO_EXPORT RecordBatchStreamWriterImpl; + std::unique_ptr<RecordBatchStreamWriterImpl> impl_; +}; + +/// \brief Creates the Arrow record batch file format +/// +/// Implements the random access file format, which structurally is a record +/// batch stream followed by a metadata footer at the end of the file. Magic +/// numbers are written at the start and end of the file +class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { + public: + virtual ~RecordBatchFileWriter(); + + /// Create a new writer from stream sink and schema + /// + /// \param(in) sink output stream to write to + /// \param(in) schema the schema of the record batches to be written + /// \param(out) out the created stream writer + /// \return Status indicating success or failure + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatchFileWriter>* out); + + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; + Status Close() override; + + private: + RecordBatchFileWriter(); + class ARROW_NO_EXPORT RecordBatchFileWriterImpl; + std::unique_ptr<RecordBatchFileWriterImpl> impl_; +}; + /// Write the RecordBatch (collection of equal-length Arrow arrays) to the /// output stream in a contiguous block. The record batch metadata is written as /// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) @@ -58,13 +136,13 @@ namespace ipc { /// to the end of the body and end of the metadata / data header (suffixed by /// the header size) is returned in out-variables /// -/// @param(in) buffer_start_offset the start offset to use in the buffer metadata, +/// \param(in) buffer_start_offset the start offset to use in the buffer metadata, /// default should be 0 -/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be +/// \param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be /// readable by other Arrow implementations -/// @param(out) metadata_length: the size of the length-prefixed flatbuffer +/// \param(out) metadata_length: the size of the length-prefixed flatbuffer /// including padding to a 64-byte boundary -/// @param(out) body_length: the size of the contiguous buffer block plus +/// \param(out) body_length: the size of the contiguous buffer block plus /// padding bytes Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, @@ -85,45 +163,6 @@ Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // write the tensor including metadata, padding, and data Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size); -class ARROW_EXPORT StreamWriter { - public: - virtual ~StreamWriter(); - - static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<StreamWriter>* out); - - virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false); - - /// Perform any logic necessary to finish the stream. User is responsible for - /// closing the actual OutputStream - virtual Status Close(); - - // In some cases, writing may require memory allocation. We use the default - // memory pool, but provide the option to override - void set_memory_pool(MemoryPool* pool); - - protected: - StreamWriter(); - class ARROW_NO_EXPORT StreamWriterImpl; - std::unique_ptr<StreamWriterImpl> impl_; -}; - -class ARROW_EXPORT FileWriter : public StreamWriter { - public: - virtual ~FileWriter(); - - static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out); - - Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; - Status Close() override; - - private: - FileWriter(); - class ARROW_NO_EXPORT FileWriterImpl; - std::unique_ptr<FileWriterImpl> impl_; -}; - /// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data /// may not be readable by all Arrow implementations Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, @@ -135,6 +174,13 @@ Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); +/// Backwards-compatibility for Arrow < 0.4.0 +/// +#ifndef ARROW_NO_DEPRECATED_API +using FileWriter = RecordBatchFileWriter; +using StreamWriter = RecordBatchStreamWriter; +#endif + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/api.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index a8dd8c5..e7bea70 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -177,10 +177,12 @@ Interprocess Communication and Messaging .. autosummary:: :toctree: generated/ - FileReader - FileWriter - StreamReader - StreamWriter + RecordBatchFileReader + RecordBatchFileWriter + RecordBatchStreamReader + RecordBatchStreamWriter + open_file + open_stream .. _api.memory_pool: http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/ipc.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst index e63e745..cce2ae8 100644 --- a/python/doc/source/ipc.rst +++ b/python/doc/source/ipc.rst @@ -55,13 +55,13 @@ First, let's create a small record batch: batch.num_columns Now, we can begin writing a stream containing some number of these batches. For -this we use :class:`~pyarrow.StreamWriter`, which can write to a writeable +this we use :class:`~pyarrow.BatchStreamWriter`, which can write to a writeable ``NativeFile`` object or a writeable Python object: .. ipython:: python sink = pa.InMemoryOutputStream() - writer = pa.StreamWriter(sink, batch.schema) + writer = pa.BatchStreamWriter(sink, batch.schema) Here we used an in-memory Arrow buffer stream, but this could have been a socket or some other IO sink. @@ -80,11 +80,11 @@ particular stream. Now we can do: buf.size Now ``buf`` contains the complete stream as an in-memory byte buffer. We can -read such a stream with :class:`~pyarrow.StreamReader`: +read such a stream with :class:`~pyarrow.BatchStreamReader`: .. ipython:: python - reader = pa.StreamReader(buf) + reader = pa.BatchStreamReader(buf) reader.schema batches = [b for b in reader] @@ -103,13 +103,13 @@ batches are also zero-copy and do not allocate any new memory on read. Writing and Reading Random Access Files --------------------------------------- -The :class:`~pyarrow.FileWriter` has the same API as -:class:`~pyarrow.StreamWriter`: +The :class:`~pyarrow.BatchFileWriter` has the same API as +:class:`~pyarrow.BatchStreamWriter`: .. ipython:: python sink = pa.InMemoryOutputStream() - writer = pa.FileWriter(sink, batch.schema) + writer = pa.BatchFileWriter(sink, batch.schema) for i in range(10): writer.write_batch(batch) @@ -118,13 +118,14 @@ The :class:`~pyarrow.FileWriter` has the same API as buf = sink.get_result() buf.size -The difference between :class:`~pyarrow.FileReader` and -:class:`~pyarrow.StreamReader` is that the input source must have a ``seek`` -method for random access. The stream reader only requires read operations: +The difference between :class:`~pyarrow.BatchFileReader` and +:class:`~pyarrow.BatchStreamReader` is that the input source must have a +``seek`` method for random access. The stream reader only requires read +operations: .. ipython:: python - reader = pa.FileReader(buf) + reader = pa.BatchFileReader(buf) Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random: http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 7d79811..d6d2aa4 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -101,7 +101,10 @@ def jemalloc_memory_pool(): from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem -from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter +from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter, + RecordBatchStreamReader, RecordBatchStreamWriter, + open_stream, + open_file) localfs = LocalFilesystem.get_instance() http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3d56c14..b03dd59 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -547,38 +547,44 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: - cdef cppclass CStreamWriter " arrow::ipc::StreamWriter": - @staticmethod - CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CStreamWriter]* out) - + cdef cppclass CRecordBatchWriter \ + " arrow::ipc::RecordBatchWriter": CStatus Close() CStatus WriteRecordBatch(const CRecordBatch& batch) - cdef cppclass CStreamReader " arrow::ipc::StreamReader": + cdef cppclass CRecordBatchReader \ + " arrow::ipc::RecordBatchReader": + shared_ptr[CSchema] schema() + CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) + cdef cppclass CRecordBatchStreamReader \ + " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod CStatus Open(const shared_ptr[InputStream]& stream, - shared_ptr[CStreamReader]* out) + shared_ptr[CRecordBatchStreamReader]* out) - shared_ptr[CSchema] schema() - - CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) - - cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter): + cdef cppclass CRecordBatchStreamWriter \ + " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CFileWriter]* out) + shared_ptr[CRecordBatchStreamWriter]* out) - cdef cppclass CFileReader " arrow::ipc::FileReader": + cdef cppclass CRecordBatchFileWriter \ + " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter): + @staticmethod + CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, + shared_ptr[CRecordBatchFileWriter]* out) + cdef cppclass CRecordBatchFileReader \ + " arrow::ipc::RecordBatchFileReader": @staticmethod CStatus Open(const shared_ptr[RandomAccessFile]& file, - shared_ptr[CFileReader]* out) + shared_ptr[CRecordBatchFileReader]* out) @staticmethod CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file, - int64_t footer_offset, shared_ptr[CFileReader]* out) + int64_t footer_offset, + shared_ptr[CRecordBatchFileReader]* out) shared_ptr[CSchema] schema() http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/io.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a0a96e7..4cbf603 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -916,9 +916,9 @@ cdef class HdfsFile(NativeFile): # ---------------------------------------------------------------------- # File and stream readers and writers -cdef class _StreamWriter: +cdef class _RecordBatchWriter: cdef: - shared_ptr[CStreamWriter] writer + shared_ptr[CRecordBatchWriter] writer shared_ptr[OutputStream] sink bint closed @@ -930,12 +930,18 @@ cdef class _StreamWriter: self.close() def _open(self, sink, Schema schema): + cdef: + shared_ptr[CRecordBatchStreamWriter] writer + get_writer(sink, &self.sink) with nogil: - check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema, - &self.writer)) + check_status( + CRecordBatchStreamWriter.Open(self.sink.get(), + schema.sp_schema, + &writer)) + self.writer = <shared_ptr[CRecordBatchWriter]> writer self.closed = False def write_batch(self, RecordBatch batch): @@ -949,9 +955,9 @@ cdef class _StreamWriter: self.closed = True -cdef class _StreamReader: +cdef class _RecordBatchReader: cdef: - shared_ptr[CStreamReader] reader + shared_ptr[CRecordBatchReader] reader cdef readonly: Schema schema @@ -961,15 +967,17 @@ cdef class _StreamReader: def _open(self, source): cdef: - shared_ptr[RandomAccessFile] reader + shared_ptr[RandomAccessFile] file_handle shared_ptr[InputStream] in_stream + shared_ptr[CRecordBatchStreamReader] reader - get_reader(source, &reader) - in_stream = <shared_ptr[InputStream]> reader + get_reader(source, &file_handle) + in_stream = <shared_ptr[InputStream]> file_handle with nogil: - check_status(CStreamReader.Open(in_stream, &self.reader)) + check_status(CRecordBatchStreamReader.Open(in_stream, &reader)) + self.reader = <shared_ptr[CRecordBatchReader]> reader self.schema = Schema() self.schema.init_schema(self.reader.get().schema()) @@ -1009,24 +1017,25 @@ cdef class _StreamReader: return pyarrow_wrap_table(table) -cdef class _FileWriter(_StreamWriter): +cdef class _RecordBatchFileWriter(_RecordBatchWriter): def _open(self, sink, Schema schema): - cdef shared_ptr[CFileWriter] writer + cdef shared_ptr[CRecordBatchFileWriter] writer get_writer(sink, &self.sink) with nogil: - check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) + check_status( + CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema, + &writer)) # Cast to base class, because has same interface - self.writer = <shared_ptr[CStreamWriter]> writer + self.writer = <shared_ptr[CRecordBatchWriter]> writer self.closed = False -cdef class _FileReader: +cdef class _RecordBatchFileReader: cdef: - shared_ptr[CFileReader] reader + shared_ptr[CRecordBatchFileReader] reader def __cinit__(self): pass @@ -1041,9 +1050,10 @@ cdef class _FileReader: with nogil: if offset != 0: - check_status(CFileReader.Open2(reader, offset, &self.reader)) + check_status(CRecordBatchFileReader.Open2( + reader, offset, &self.reader)) else: - check_status(CFileReader.Open(reader, &self.reader)) + check_status(CRecordBatchFileReader.Open(reader, &self.reader)) property num_record_batches: http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index c37a1ce..8338de3 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -20,7 +20,7 @@ import pyarrow.lib as lib -class StreamReader(lib._StreamReader): +class RecordBatchStreamReader(lib._RecordBatchReader): """ Reader for the Arrow streaming binary format @@ -37,7 +37,7 @@ class StreamReader(lib._StreamReader): yield self.get_next_batch() -class StreamWriter(lib._StreamWriter): +class RecordBatchStreamWriter(lib._RecordBatchWriter): """ Writer for the Arrow streaming binary format @@ -52,7 +52,7 @@ class StreamWriter(lib._StreamWriter): self._open(sink, schema) -class FileReader(lib._FileReader): +class RecordBatchFileReader(lib._RecordBatchFileReader): """ Class for reading Arrow record batch data from the Arrow binary file format @@ -68,7 +68,7 @@ class FileReader(lib._FileReader): self._open(source, footer_offset=footer_offset) -class FileWriter(lib._FileWriter): +class RecordBatchFileWriter(lib._RecordBatchFileWriter): """ Writer to create the Arrow binary file format @@ -81,3 +81,41 @@ class FileWriter(lib._FileWriter): """ def __init__(self, sink, schema): self._open(sink, schema) + + +def open_stream(source): + """ + Create reader for Arrow streaming format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + footer_offset : int, default None + If the file is embedded in some larger file, this is the byte offset to + the very end of the file data + + Returns + ------- + reader : RecordBatchStreamReader + """ + return RecordBatchStreamReader(source) + + +def open_file(source, footer_offset=None): + """ + Create reader for Arrow file format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + footer_offset : int, default None + If the file is embedded in some larger file, this is the byte offset to + the very end of the file data + + Returns + ------- + reader : RecordBatchFileReader + """ + return RecordBatchFileReader(source, footer_offset=footer_offset) http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 0204067..4d19804 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -70,13 +70,13 @@ class TestFile(MessagingTest, unittest.TestCase): # Also tests writing zero-copy NumPy array with additional padding def _get_writer(self, sink, schema): - return pa.FileWriter(sink, schema) + return pa.RecordBatchFileWriter(sink, schema) def test_simple_roundtrip(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.FileReader(file_contents) + reader = pa.open_file(file_contents) assert reader.num_record_batches == len(batches) @@ -89,7 +89,7 @@ class TestFile(MessagingTest, unittest.TestCase): batches = self.write_batches() file_contents = self._get_source() - reader = pa.FileReader(file_contents) + reader = pa.open_file(file_contents) result = reader.read_all() expected = pa.Table.from_batches(batches) @@ -99,12 +99,12 @@ class TestFile(MessagingTest, unittest.TestCase): class TestStream(MessagingTest, unittest.TestCase): def _get_writer(self, sink, schema): - return pa.StreamWriter(sink, schema) + return pa.RecordBatchStreamWriter(sink, schema) def test_simple_roundtrip(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.StreamReader(file_contents) + reader = pa.open_stream(file_contents) assert reader.schema.equals(batches[0].schema) @@ -121,7 +121,7 @@ class TestStream(MessagingTest, unittest.TestCase): def test_read_all(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.StreamReader(file_contents) + reader = pa.open_stream(file_contents) result = reader.read_all() expected = pa.Table.from_batches(batches) @@ -147,7 +147,7 @@ class TestSocket(MessagingTest, unittest.TestCase): connection, client_address = self._sock.accept() try: source = connection.makefile(mode='rb') - reader = pa.StreamReader(source) + reader = pa.open_stream(source) self._schema = reader.schema if self._do_read_all: self._table = reader.read_all() @@ -185,7 +185,7 @@ class TestSocket(MessagingTest, unittest.TestCase): return self._sock.makefile(mode='wb') def _get_writer(self, sink, schema): - return pa.StreamWriter(sink, schema) + return pa.RecordBatchStreamWriter(sink, schema) def test_simple_roundtrip(self): self.start_server(do_read_all=False) @@ -241,12 +241,12 @@ def test_get_record_batch_size(): def write_file(batch, sink): - writer = pa.FileWriter(sink, batch.schema) + writer = pa.RecordBatchFileWriter(sink, batch.schema) writer.write_batch(batch) writer.close() def read_file(source): - reader = pa.FileReader(source) + reader = pa.open_file(source) return [reader.get_batch(i) for i in range(reader.num_record_batches)]