Repository: arrow Updated Branches: refs/heads/master f7b287a28 -> 642b753a4
ARROW-698: Add flag to FileWriter::WriteRecordBatch for writing record batches with lengths over INT32_MAX cc @pcmoritz Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #455 from wesm/ARROW-698 and squashes the following commits: 42c100c [Wes McKinney] Add allow_64bit option to FileWriter::WriteRecordBatch Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/642b753a Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/642b753a Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/642b753a Branch: refs/heads/master Commit: 642b753a49a3fcb5d53946c773cd70ab2a3ece88 Parents: f7b287a Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Thu Mar 30 10:19:50 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Thu Mar 30 10:19:50 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/ipc/ipc-read-write-test.cc | 20 ++++++++++++-------- cpp/src/arrow/ipc/writer.cc | 18 ++++++++++-------- cpp/src/arrow/ipc/writer.h | 4 ++-- cpp/src/arrow/type-test.cc | 8 ++++---- cpp/src/arrow/util/visibility.h | 8 ++++---- 5 files changed, 32 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/ipc-read-write-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index cd3f190..48e546e 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -138,17 +138,21 @@ class IpcTestFixture : public io::MemoryMapFixture { Status DoLargeRoundTrip( const RecordBatch& batch, bool zero_data, std::shared_ptr<RecordBatch>* result) { - int32_t metadata_length; - int64_t body_length; - - const int64_t buffer_offset = 0; - if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); } RETURN_NOT_OK(mmap_->Seek(0)); - RETURN_NOT_OK(WriteLargeRecordBatch( - batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); - return ReadRecordBatch(batch.schema(), 0, mmap_.get(), result); + std::shared_ptr<FileWriter> file_writer; + RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); + RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true)); + RETURN_NOT_OK(file_writer->Close()); + + int64_t offset; + RETURN_NOT_OK(mmap_->Tell(&offset)); + + std::shared_ptr<FileReader> file_reader; + RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader)); + + return file_reader->GetRecordBatch(0, result); } void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) { http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index da360f3..92e6194 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -591,7 +591,7 @@ class StreamWriter::StreamWriterImpl { return Status::OK(); } - Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block) { + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit, FileBlock* block) { RETURN_NOT_OK(CheckStarted()); block->offset = position_; @@ -599,7 +599,8 @@ class StreamWriter::StreamWriterImpl { // Frame of reference in file format is 0, see ARROW-384 const int64_t buffer_start_offset = 0; RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_, - &block->metadata_length, &block->body_length, pool_)); + &block->metadata_length, &block->body_length, pool_, kMaxNestingDepth, + allow_64bit)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; @@ -607,10 +608,11 @@ class StreamWriter::StreamWriterImpl { return Status::OK(); } - Status WriteRecordBatch(const RecordBatch& batch) { + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { // Push an empty FileBlock. Can be written in the footer later record_batches_.emplace_back(0, 0, 0); - return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]); + return WriteRecordBatch( + batch, allow_64bit, &record_batches_[record_batches_.size() - 1]); } // Adds padding bytes if necessary to ensure all memory blocks are written on @@ -657,8 +659,8 @@ StreamWriter::StreamWriter() { impl_.reset(new StreamWriterImpl()); } -Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { - return impl_->WriteRecordBatch(batch); +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { + return impl_->WriteRecordBatch(batch, allow_64bit); } void StreamWriter::set_memory_pool(MemoryPool* pool) { @@ -723,8 +725,8 @@ Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& s return (*out)->impl_->Open(sink, schema); } -Status FileWriter::WriteRecordBatch(const RecordBatch& batch) { - return impl_->WriteRecordBatch(batch); +Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { + return impl_->WriteRecordBatch(batch, allow_64bit); } Status FileWriter::Close() { http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 3b7e710..25b5ad6 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -87,7 +87,7 @@ class ARROW_EXPORT StreamWriter { static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, std::shared_ptr<StreamWriter>* out); - virtual Status WriteRecordBatch(const RecordBatch& batch); + virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false); /// Perform any logic necessary to finish the stream. User is responsible for /// closing the actual OutputStream @@ -108,7 +108,7 @@ class ARROW_EXPORT FileWriter : public StreamWriter { static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, std::shared_ptr<FileWriter>* out); - Status WriteRecordBatch(const RecordBatch& batch) override; + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; private: http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/type-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc index 7f13f8b..ed86543 100644 --- a/cpp/src/arrow/type-test.cc +++ b/cpp/src/arrow/type-test.cc @@ -232,16 +232,16 @@ TEST(TestTimestampType, ToString) { } TEST(TestNestedType, Equals) { - auto create_struct = - [](std::string inner_name, std::string struct_name) -> shared_ptr<Field> { + auto create_struct = []( + std::string inner_name, std::string struct_name) -> shared_ptr<Field> { auto f_type = field(inner_name, int32()); vector<shared_ptr<Field>> fields = {f_type}; auto s_type = std::make_shared<StructType>(fields); return field(struct_name, s_type); }; - auto create_union = - [](std::string inner_name, std::string union_name) -> shared_ptr<Field> { + auto create_union = []( + std::string inner_name, std::string union_name) -> shared_ptr<Field> { auto f_type = field(inner_name, int32()); vector<shared_ptr<Field>> fields = {f_type}; vector<uint8_t> codes = {Type::INT32}; http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/util/visibility.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/visibility.h b/cpp/src/arrow/util/visibility.h index 6382f7f..e84cc45 100644 --- a/cpp/src/arrow/util/visibility.h +++ b/cpp/src/arrow/util/visibility.h @@ -39,17 +39,17 @@ // explicit specializations https://llvm.org/bugs/show_bug.cgi?id=24815 #if defined(__clang__) - #define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT +#define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT #else - #define ARROW_EXTERN_TEMPLATE extern template class +#define ARROW_EXTERN_TEMPLATE extern template class #endif // This is a complicated topic, some reading on it: // http://www.codesynthesis.com/~boris/blog/2010/01/18/dll-export-cxx-templates/ #if defined(_MSC_VER) - #define ARROW_TEMPLATE_EXPORT ARROW_EXPORT +#define ARROW_TEMPLATE_EXPORT ARROW_EXPORT #else - #define ARROW_TEMPLATE_EXPORT +#define ARROW_TEMPLATE_EXPORT #endif #endif // ARROW_UTIL_VISIBILITY_H