Repository: arrow Updated Branches: refs/heads/master 71424c20d -> ced9d766d
ARROW-679: [Format] Change FieldNode, RecordBatch lengths to long, remove LargeRecordBatch. Refactoring This enables me to delete a bunch of code without losing functionality. C++ users must explicitly opt-in to writing size over INT32_MAX. cc @julienledem. I have not added checks in Java about sizes over INT32_MAX, wasn't sure where you might want to do that. Author: Wes McKinney <[email protected]> Closes #417 from wesm/ARROW-679 and squashes the following commits: ea237b1 [Wes McKinney] Document allow_64bit for WriteRecordBatch e237d4a [Wes McKinney] Change FieldNode, RecordBatch lengths to long, remove LargeRecordBatch. Refactoring Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ced9d766 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ced9d766 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ced9d766 Branch: refs/heads/master Commit: ced9d766d70e84c4d0542c6f5d9bd57faf10781d Parents: 71424c2 Author: Wes McKinney <[email protected]> Authored: Wed Mar 22 14:05:33 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Wed Mar 22 14:05:33 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/ipc/ipc-read-write-test.cc | 2 +- cpp/src/arrow/ipc/metadata.cc | 48 +---------------- cpp/src/arrow/ipc/metadata.h | 6 +-- cpp/src/arrow/ipc/reader.cc | 55 +++----------------- cpp/src/arrow/ipc/reader.h | 7 +-- cpp/src/arrow/ipc/writer.cc | 52 ++++++------------ cpp/src/arrow/ipc/writer.h | 51 +++++++++--------- format/Message.fbs | 24 ++------- .../arrow/vector/schema/ArrowFieldNode.java | 2 +- .../arrow/vector/stream/MessageSerializer.java | 4 +- 10 files changed, 61 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/ipc-read-write-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 0011844..6919aeb 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -163,7 +163,7 @@ class IpcTestFixture : public io::MemoryMapFixture { RETURN_NOT_OK(WriteLargeRecordBatch( batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); - return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result); + return ReadRecordBatch(batch.schema(), 0, mmap_.get(), result); } void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) { http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index c091bac..b10ccec 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -42,7 +42,6 @@ namespace ipc { using FBB = flatbuffers::FlatBufferBuilder; using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>; using FieldOffset = flatbuffers::Offset<flatbuf::Field>; -using LargeRecordBatchOffset = flatbuffers::Offset<flatbuf::LargeRecordBatch>; using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>; using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>; using Offset = flatbuffers::Offset<void>; @@ -558,8 +557,6 @@ Status WriteSchemaMessage( using FieldNodeVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>; -using LargeFieldNodeVector = - flatbuffers::Offset<flatbuffers::Vector<const flatbuf::LargeFieldNode*>>; using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>; static Status WriteFieldNodes( @@ -572,23 +569,6 @@ static Status WriteFieldNodes( if (node.offset != 0) { return Status::Invalid("Field metadata for IPC must have offset 0"); } - fb_nodes.emplace_back( - static_cast<int32_t>(node.length), static_cast<int32_t>(node.null_count)); - } - *out = fbb.CreateVectorOfStructs(fb_nodes); - return Status::OK(); -} - -static Status WriteLargeFieldNodes( - FBB& fbb, const std::vector<FieldMetadata>& nodes, LargeFieldNodeVector* out) { - std::vector<flatbuf::LargeFieldNode> fb_nodes; - fb_nodes.reserve(nodes.size()); - - for (size_t i = 0; i < nodes.size(); ++i) { - const FieldMetadata& node = nodes[i]; - if (node.offset != 0) { - return Status::Invalid("Field metadata for IPC must have offset 0"); - } fb_nodes.emplace_back(node.length, node.null_count); } *out = fbb.CreateVectorOfStructs(fb_nodes); @@ -621,19 +601,6 @@ static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length, return Status::OK(); } -static Status MakeLargeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - LargeRecordBatchOffset* offset) { - LargeFieldNodeVector fb_nodes; - BufferVector fb_buffers; - - RETURN_NOT_OK(WriteLargeFieldNodes(fbb, nodes, &fb_nodes)); - RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers)); - - *offset = flatbuf::CreateLargeRecordBatch(fbb, length, fb_nodes, fb_buffers); - return Status::OK(); -} - Status WriteRecordBatchMessage(int32_t length, int64_t body_length, const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, std::shared_ptr<Buffer>* out) { @@ -644,17 +611,6 @@ Status WriteRecordBatchMessage(int32_t length, int64_t body_length, fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out); } -Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out) { - FBB fbb; - LargeRecordBatchOffset large_batch; - RETURN_NOT_OK( - MakeLargeRecordBatch(fbb, length, body_length, nodes, buffers, &large_batch)); - return WriteMessage(fbb, flatbuf::MessageHeader_LargeRecordBatch, large_batch.Union(), - body_length, out); -} - Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, std::shared_ptr<Buffer>* out) { @@ -917,7 +873,7 @@ class RecordBatchMetadata::RecordBatchMetadataImpl : public MessageHolder { const flatbuf::Buffer* buffer(int i) const { return buffers_->Get(i); } - int32_t length() const { return batch_->length(); } + int64_t length() const { return batch_->length(); } int num_buffers() const { return batch_->buffers()->size(); } @@ -969,7 +925,7 @@ BufferMetadata RecordBatchMetadata::buffer(int i) const { return result; } -int32_t RecordBatchMetadata::length() const { +int64_t RecordBatchMetadata::length() const { return impl_->length(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 41e6c5e..dc07c7a 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -150,7 +150,7 @@ class ARROW_EXPORT RecordBatchMetadata { FieldMetadata field(int i) const; BufferMetadata buffer(int i) const; - int32_t length() const; + int64_t length() const; int num_buffers() const; int num_fields() const; @@ -229,10 +229,6 @@ Status WriteRecordBatchMessage(int32_t length, int64_t body_length, const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, std::shared_ptr<Buffer>* out); -Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out); - Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, std::shared_ptr<Buffer>* out); http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a2b20a9..71ba951 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -468,48 +468,7 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { return impl_->GetRecordBatch(i, batch); } -// ---------------------------------------------------------------------- -// Read LargeRecordBatch - -class LargeRecordBatchSource : public ArrayComponentSource { - public: - LargeRecordBatchSource( - const flatbuf::LargeRecordBatch* metadata, io::RandomAccessFile* file) - : metadata_(metadata), file_(file) {} - - Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override { - if (buffer_index >= static_cast<int>(metadata_->buffers()->size())) { - return Status::Invalid("Ran out of buffer metadata, likely malformed"); - } - const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index); - - if (buffer->length() == 0) { - *out = nullptr; - return Status::OK(); - } else { - return file_->ReadAt(buffer->offset(), buffer->length(), out); - } - } - - Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override { - // pop off a field - if (field_index >= static_cast<int>(metadata_->nodes()->size())) { - return Status::Invalid("Ran out of field metadata, likely malformed"); - } - const flatbuf::LargeFieldNode* node = metadata_->nodes()->Get(field_index); - - metadata->length = node->length(); - metadata->null_count = node->null_count(); - metadata->offset = 0; - return Status::OK(); - } - - private: - const flatbuf::LargeRecordBatch* metadata_; - io::RandomAccessFile* file_; -}; - -Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, +Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->Seek(offset)); @@ -517,19 +476,19 @@ Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offse RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); + std::shared_ptr<Message> message; RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer)); - auto message = flatbuf::GetMessage(buffer->data()); - auto batch = reinterpret_cast<const flatbuf::LargeRecordBatch*>(message->header()); + RETURN_NOT_OK(Message::Open(buffer, 0, &message)); + + RecordBatchMetadata metadata(message); // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a // RandomAccessFile according to that frame of reference std::shared_ptr<Buffer> buffer_payload; - RETURN_NOT_OK(file->Read(message->bodyLength(), &buffer_payload)); + RETURN_NOT_OK(file->Read(message->body_length(), &buffer_payload)); io::BufferReader buffer_reader(buffer_payload); - LargeRecordBatchSource source(batch, &buffer_reader); - return LoadRecordBatchFromSource( - schema, batch->length(), kMaxNestingDepth, &source, out); + return ReadRecordBatch(metadata, schema, kMaxNestingDepth, &buffer_reader, out); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 1c1314a..1e8636c 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -120,12 +120,9 @@ class ARROW_EXPORT FileReader { std::unique_ptr<FileReaderImpl> impl_; }; -// ---------------------------------------------------------------------- -// -/// EXPERIMENTAL: Read length-prefixed LargeRecordBatch metadata (64-bit array -/// lengths) at offset and reconstruct RecordBatch -Status ARROW_EXPORT ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, +/// Read encapsulated message and RecordBatch +Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index ef59471..0f55f8e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -48,28 +48,25 @@ namespace ipc { class RecordBatchWriter : public ArrayVisitor { public: RecordBatchWriter( - MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth) + MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth, + bool allow_64bit) : pool_(pool), max_recursion_depth_(max_recursion_depth), - buffer_start_offset_(buffer_start_offset) { + buffer_start_offset_(buffer_start_offset), + allow_64bit_(allow_64bit) { DCHECK_GT(max_recursion_depth, 0); } virtual ~RecordBatchWriter() = default; - virtual Status CheckArrayMetadata(const Array& arr) { - if (arr.length() > std::numeric_limits<int32_t>::max()) { - return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length"); - } - return Status::OK(); - } - Status VisitArray(const Array& arr) { if (max_recursion_depth_ <= 0) { return Status::Invalid("Max recursion depth reached"); } - RETURN_NOT_OK(CheckArrayMetadata(arr)); + if (!allow_64bit_ && arr.length() > std::numeric_limits<int32_t>::max()) { + return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length"); + } // push back all common elements field_nodes_.emplace_back(arr.length(), arr.null_count(), 0); @@ -470,6 +467,7 @@ class RecordBatchWriter : public ArrayVisitor { int64_t max_recursion_depth_; int64_t buffer_start_offset_; + bool allow_64bit_; }; class DictionaryWriter : public RecordBatchWriter { @@ -502,20 +500,21 @@ class DictionaryWriter : public RecordBatchWriter { Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth) { - RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth); + MemoryPool* pool, int max_recursion_depth, bool allow_64bit) { + RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth, + allow_64bit); return writer.Write(batch, dst, metadata_length, body_length); } Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { - DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth); + DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false); return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { - RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth); + RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth, true); RETURN_NOT_OK(writer.GetTotalSize(batch, size)); return Status::OK(); } @@ -733,30 +732,11 @@ Status FileWriter::Close() { return impl_->Close(); } -// ---------------------------------------------------------------------- -// Write record batches with 64-bit size metadata - -class LargeRecordBatchWriter : public RecordBatchWriter { - public: - using RecordBatchWriter::RecordBatchWriter; - - Status CheckArrayMetadata(const Array& arr) override { - // No < INT32_MAX length check - return Status::OK(); - } - - Status WriteMetadataMessage( - int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { - return WriteLargeRecordBatchMessage( - num_rows, body_length, field_nodes_, buffer_meta_, out); - } -}; - Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth) { - LargeRecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth); - return writer.Write(batch, dst, metadata_length, body_length); + MemoryPool* pool) { + return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, body_length, + pool, kMaxNestingDepth, true); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 1271652..3b7e710 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -45,29 +45,30 @@ class OutputStream; namespace ipc { -// Write the RecordBatch (collection of equal-length Arrow arrays) to the -// output stream in a contiguous block. The record batch metadata is written as -// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) -// prefixed by its size, followed by each of the memory buffers in the batch -// written end to end (with appropriate alignment and padding): -// -// <int32: metadata size> <uint8*: metadata> <buffers> -// -// Finally, the absolute offsets (relative to the start of the output stream) -// to the end of the body and end of the metadata / data header (suffixed by -// the header size) is returned in out-variables -// -// @param(in) buffer_start_offset: the start offset to use in the buffer metadata, -// default should be 0 -// -// @param(out) metadata_length: the size of the length-prefixed flatbuffer -// including padding to a 64-byte boundary -// -// @param(out) body_length: the size of the contiguous buffer block plus -// padding bytes +/// Write the RecordBatch (collection of equal-length Arrow arrays) to the +/// output stream in a contiguous block. The record batch metadata is written as +/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) +/// prefixed by its size, followed by each of the memory buffers in the batch +/// written end to end (with appropriate alignment and padding): +/// +/// <int32: metadata size> <uint8*: metadata> <buffers> +/// +/// Finally, the absolute offsets (relative to the start of the output stream) +/// to the end of the body and end of the metadata / data header (suffixed by +/// the header size) is returned in out-variables +/// +/// @param(in) buffer_start_offset the start offset to use in the buffer metadata, +/// default should be 0 +/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be +/// readable by other Arrow implementations +/// @param(out) metadata_length: the size of the length-prefixed flatbuffer +/// including padding to a 64-byte boundary +/// @param(out) body_length: the size of the contiguous buffer block plus +/// padding bytes Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth); + MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth, + bool allow_64bit = false); // Write Array as a DictionaryBatch message Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, @@ -116,13 +117,11 @@ class ARROW_EXPORT FileWriter : public StreamWriter { std::unique_ptr<FileWriterImpl> impl_; }; -// ---------------------------------------------------------------------- - -/// EXPERIMENTAL: Write record batch using LargeRecordBatch IPC metadata. This -/// data may not be readable by all Arrow implementations +/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data +/// may not be readable by all Arrow implementations Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth); + MemoryPool* pool); } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/format/Message.fbs ---------------------------------------------------------------------- diff --git a/format/Message.fbs b/format/Message.fbs index e56366d..ff30ace 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -290,12 +290,12 @@ struct Buffer { struct FieldNode { /// The number of value slots in the Arrow array at this level of a nested /// tree - length: int; + length: long; /// The number of observed nulls. Fields with null_count == 0 may choose not /// to write their physical validity bitmap out as a materialized buffer, /// instead setting the length of the bitmap buffer to 0. - null_count: int; + null_count: long; } /// A data header describing the shared memory layout of a "record" or "row" @@ -304,7 +304,7 @@ struct FieldNode { table RecordBatch { /// number of records / rows. The arrays in the batch should all have this /// length - length: int; + length: long; /// Nodes correspond to the pre-ordered flattened logical schema nodes: [FieldNode]; @@ -319,22 +319,6 @@ table RecordBatch { } /// ---------------------------------------------------------------------- -/// EXPERIMENTAL: A RecordBatch type that supports data with more than 2^31 - 1 -/// elements. Arrow implementations do not need to implement this type to be -/// compliant - -struct LargeFieldNode { - length: long; - null_count: long; -} - -table LargeRecordBatch { - length: long; - nodes: [LargeFieldNode]; - buffers: [Buffer]; -} - -/// ---------------------------------------------------------------------- /// For sending dictionary encoding information. Any Field can be /// dictionary-encoded, but in this case none of its children may be /// dictionary-encoded. @@ -356,7 +340,7 @@ table DictionaryBatch { /// which may include experimental metadata types. For maximum compatibility, /// it is best to send data using RecordBatch union MessageHeader { - Schema, DictionaryBatch, RecordBatch, LargeRecordBatch + Schema, DictionaryBatch, RecordBatch } table Message { http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java index 71dd0ab..72ce982 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java @@ -34,7 +34,7 @@ public class ArrowFieldNode implements FBSerializable { @Override public int writeTo(FlatBufferBuilder builder) { - return FieldNode.createFieldNode(builder, length, nullCount); + return FieldNode.createFieldNode(builder, (long)length, (long)nullCount); } public int getNullCount() { http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 92a6c0c..f85fb51 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -207,7 +207,7 @@ public class MessageSerializer { List<ArrowFieldNode> nodes = new ArrayList<>(); for (int i = 0; i < nodesLength; ++i) { FieldNode node = recordBatchFB.nodes(i); - nodes.add(new ArrowFieldNode(node.length(), node.nullCount())); + nodes.add(new ArrowFieldNode((int)node.length(), (int)node.nullCount())); } List<ArrowBuf> buffers = new ArrayList<>(); for (int i = 0; i < recordBatchFB.buffersLength(); ++i) { @@ -216,7 +216,7 @@ public class MessageSerializer { buffers.add(vectorBuffer); } ArrowRecordBatch arrowRecordBatch = - new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers); + new ArrowRecordBatch((int)recordBatchFB.length(), nodes, buffers); body.release(); return arrowRecordBatch; }
