Repository: arrow
Updated Branches:
  refs/heads/master 71424c20d -> ced9d766d


ARROW-679: [Format] Change FieldNode, RecordBatch lengths to long, remove 
LargeRecordBatch. Refactoring

This enables me to delete a bunch of code without losing functionality. C++ 
users must explicitly opt-in to writing size over INT32_MAX.

cc @julienledem. I have not added checks in Java about sizes over INT32_MAX, 
wasn't sure where you might want to do that.

Author: Wes McKinney <[email protected]>

Closes #417 from wesm/ARROW-679 and squashes the following commits:

ea237b1 [Wes McKinney] Document allow_64bit for WriteRecordBatch
e237d4a [Wes McKinney] Change FieldNode, RecordBatch lengths to long, remove 
LargeRecordBatch. Refactoring


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ced9d766
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ced9d766
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ced9d766

Branch: refs/heads/master
Commit: ced9d766d70e84c4d0542c6f5d9bd57faf10781d
Parents: 71424c2
Author: Wes McKinney <[email protected]>
Authored: Wed Mar 22 14:05:33 2017 -0400
Committer: Wes McKinney <[email protected]>
Committed: Wed Mar 22 14:05:33 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/ipc-read-write-test.cc        |  2 +-
 cpp/src/arrow/ipc/metadata.cc                   | 48 +----------------
 cpp/src/arrow/ipc/metadata.h                    |  6 +--
 cpp/src/arrow/ipc/reader.cc                     | 55 +++-----------------
 cpp/src/arrow/ipc/reader.h                      |  7 +--
 cpp/src/arrow/ipc/writer.cc                     | 52 ++++++------------
 cpp/src/arrow/ipc/writer.h                      | 51 +++++++++---------
 format/Message.fbs                              | 24 ++-------
 .../arrow/vector/schema/ArrowFieldNode.java     |  2 +-
 .../arrow/vector/stream/MessageSerializer.java  |  4 +-
 10 files changed, 61 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc 
b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 0011844..6919aeb 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -163,7 +163,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
 
     RETURN_NOT_OK(WriteLargeRecordBatch(
         batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, 
pool_));
-    return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result);
+    return ReadRecordBatch(batch.schema(), 0, mmap_.get(), result);
   }
 
   void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) 
{

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index c091bac..b10ccec 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -42,7 +42,6 @@ namespace ipc {
 using FBB = flatbuffers::FlatBufferBuilder;
 using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
 using FieldOffset = flatbuffers::Offset<flatbuf::Field>;
-using LargeRecordBatchOffset = flatbuffers::Offset<flatbuf::LargeRecordBatch>;
 using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>;
 using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
 using Offset = flatbuffers::Offset<void>;
@@ -558,8 +557,6 @@ Status WriteSchemaMessage(
 
 using FieldNodeVector =
     flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
-using LargeFieldNodeVector =
-    flatbuffers::Offset<flatbuffers::Vector<const flatbuf::LargeFieldNode*>>;
 using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const 
flatbuf::Buffer*>>;
 
 static Status WriteFieldNodes(
@@ -572,23 +569,6 @@ static Status WriteFieldNodes(
     if (node.offset != 0) {
       return Status::Invalid("Field metadata for IPC must have offset 0");
     }
-    fb_nodes.emplace_back(
-        static_cast<int32_t>(node.length), 
static_cast<int32_t>(node.null_count));
-  }
-  *out = fbb.CreateVectorOfStructs(fb_nodes);
-  return Status::OK();
-}
-
-static Status WriteLargeFieldNodes(
-    FBB& fbb, const std::vector<FieldMetadata>& nodes, LargeFieldNodeVector* 
out) {
-  std::vector<flatbuf::LargeFieldNode> fb_nodes;
-  fb_nodes.reserve(nodes.size());
-
-  for (size_t i = 0; i < nodes.size(); ++i) {
-    const FieldMetadata& node = nodes[i];
-    if (node.offset != 0) {
-      return Status::Invalid("Field metadata for IPC must have offset 0");
-    }
     fb_nodes.emplace_back(node.length, node.null_count);
   }
   *out = fbb.CreateVectorOfStructs(fb_nodes);
@@ -621,19 +601,6 @@ static Status MakeRecordBatch(FBB& fbb, int32_t length, 
int64_t body_length,
   return Status::OK();
 }
 
-static Status MakeLargeRecordBatch(FBB& fbb, int64_t length, int64_t 
body_length,
-    const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
-    LargeRecordBatchOffset* offset) {
-  LargeFieldNodeVector fb_nodes;
-  BufferVector fb_buffers;
-
-  RETURN_NOT_OK(WriteLargeFieldNodes(fbb, nodes, &fb_nodes));
-  RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
-
-  *offset = flatbuf::CreateLargeRecordBatch(fbb, length, fb_nodes, fb_buffers);
-  return Status::OK();
-}
-
 Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out) {
@@ -644,17 +611,6 @@ Status WriteRecordBatchMessage(int32_t length, int64_t 
body_length,
       fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), 
body_length, out);
 }
 
-Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
-    const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out) {
-  FBB fbb;
-  LargeRecordBatchOffset large_batch;
-  RETURN_NOT_OK(
-      MakeLargeRecordBatch(fbb, length, body_length, nodes, buffers, 
&large_batch));
-  return WriteMessage(fbb, flatbuf::MessageHeader_LargeRecordBatch, 
large_batch.Union(),
-      body_length, out);
-}
-
 Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out) {
@@ -917,7 +873,7 @@ class RecordBatchMetadata::RecordBatchMetadataImpl : public 
MessageHolder {
 
   const flatbuf::Buffer* buffer(int i) const { return buffers_->Get(i); }
 
-  int32_t length() const { return batch_->length(); }
+  int64_t length() const { return batch_->length(); }
 
   int num_buffers() const { return batch_->buffers()->size(); }
 
@@ -969,7 +925,7 @@ BufferMetadata RecordBatchMetadata::buffer(int i) const {
   return result;
 }
 
-int32_t RecordBatchMetadata::length() const {
+int64_t RecordBatchMetadata::length() const {
   return impl_->length();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 41e6c5e..dc07c7a 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -150,7 +150,7 @@ class ARROW_EXPORT RecordBatchMetadata {
   FieldMetadata field(int i) const;
   BufferMetadata buffer(int i) const;
 
-  int32_t length() const;
+  int64_t length() const;
   int num_buffers() const;
   int num_fields() const;
 
@@ -229,10 +229,6 @@ Status WriteRecordBatchMessage(int32_t length, int64_t 
body_length,
     const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out);
 
-Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
-    const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out);
-
 Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out);

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a2b20a9..71ba951 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -468,48 +468,7 @@ Status FileReader::GetRecordBatch(int i, 
std::shared_ptr<RecordBatch>* batch) {
   return impl_->GetRecordBatch(i, batch);
 }
 
-// ----------------------------------------------------------------------
-// Read LargeRecordBatch
-
-class LargeRecordBatchSource : public ArrayComponentSource {
- public:
-  LargeRecordBatchSource(
-      const flatbuf::LargeRecordBatch* metadata, io::RandomAccessFile* file)
-      : metadata_(metadata), file_(file) {}
-
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
-    if (buffer_index >= static_cast<int>(metadata_->buffers()->size())) {
-      return Status::Invalid("Ran out of buffer metadata, likely malformed");
-    }
-    const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index);
-
-    if (buffer->length() == 0) {
-      *out = nullptr;
-      return Status::OK();
-    } else {
-      return file_->ReadAt(buffer->offset(), buffer->length(), out);
-    }
-  }
-
-  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
-    // pop off a field
-    if (field_index >= static_cast<int>(metadata_->nodes()->size())) {
-      return Status::Invalid("Ran out of field metadata, likely malformed");
-    }
-    const flatbuf::LargeFieldNode* node = metadata_->nodes()->Get(field_index);
-
-    metadata->length = node->length();
-    metadata->null_count = node->null_count();
-    metadata->offset = 0;
-    return Status::OK();
-  }
-
- private:
-  const flatbuf::LargeRecordBatch* metadata_;
-  io::RandomAccessFile* file_;
-};
-
-Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
offset,
+Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<Buffer> buffer;
   RETURN_NOT_OK(file->Seek(offset));
@@ -517,19 +476,19 @@ Status ReadLargeRecordBatch(const 
std::shared_ptr<Schema>& schema, int64_t offse
   RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
   int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
 
+  std::shared_ptr<Message> message;
   RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer));
-  auto message = flatbuf::GetMessage(buffer->data());
-  auto batch = reinterpret_cast<const 
flatbuf::LargeRecordBatch*>(message->header());
+  RETURN_NOT_OK(Message::Open(buffer, 0, &message));
+
+  RecordBatchMetadata metadata(message);
 
   // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a
   // RandomAccessFile according to that frame of reference
   std::shared_ptr<Buffer> buffer_payload;
-  RETURN_NOT_OK(file->Read(message->bodyLength(), &buffer_payload));
+  RETURN_NOT_OK(file->Read(message->body_length(), &buffer_payload));
   io::BufferReader buffer_reader(buffer_payload);
 
-  LargeRecordBatchSource source(batch, &buffer_reader);
-  return LoadRecordBatchFromSource(
-      schema, batch->length(), kMaxNestingDepth, &source, out);
+  return ReadRecordBatch(metadata, schema, kMaxNestingDepth, &buffer_reader, 
out);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 1c1314a..1e8636c 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -120,12 +120,9 @@ class ARROW_EXPORT FileReader {
   std::unique_ptr<FileReaderImpl> impl_;
 };
 
-// ----------------------------------------------------------------------
-//
 
-/// EXPERIMENTAL: Read length-prefixed LargeRecordBatch metadata (64-bit array
-/// lengths) at offset and reconstruct RecordBatch
-Status ARROW_EXPORT ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema,
+/// Read encapsulated message and RecordBatch
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema,
     int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* 
out);
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index ef59471..0f55f8e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -48,28 +48,25 @@ namespace ipc {
 class RecordBatchWriter : public ArrayVisitor {
  public:
   RecordBatchWriter(
-      MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
+      MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth,
+      bool allow_64bit)
       : pool_(pool),
         max_recursion_depth_(max_recursion_depth),
-        buffer_start_offset_(buffer_start_offset) {
+        buffer_start_offset_(buffer_start_offset),
+        allow_64bit_(allow_64bit) {
     DCHECK_GT(max_recursion_depth, 0);
   }
 
   virtual ~RecordBatchWriter() = default;
 
-  virtual Status CheckArrayMetadata(const Array& arr) {
-    if (arr.length() > std::numeric_limits<int32_t>::max()) {
-      return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in 
length");
-    }
-    return Status::OK();
-  }
-
   Status VisitArray(const Array& arr) {
     if (max_recursion_depth_ <= 0) {
       return Status::Invalid("Max recursion depth reached");
     }
 
-    RETURN_NOT_OK(CheckArrayMetadata(arr));
+    if (!allow_64bit_ && arr.length() > std::numeric_limits<int32_t>::max()) {
+      return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in 
length");
+    }
 
     // push back all common elements
     field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
@@ -470,6 +467,7 @@ class RecordBatchWriter : public ArrayVisitor {
 
   int64_t max_recursion_depth_;
   int64_t buffer_start_offset_;
+  bool allow_64bit_;
 };
 
 class DictionaryWriter : public RecordBatchWriter {
@@ -502,20 +500,21 @@ class DictionaryWriter : public RecordBatchWriter {
 
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth) {
-  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+    MemoryPool* pool, int max_recursion_depth, bool allow_64bit) {
+  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth,
+      allow_64bit);
   return writer.Write(batch, dst, metadata_length, body_length);
 }
 
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& 
dictionary,
     int64_t buffer_start_offset, io::OutputStream* dst, int32_t* 
metadata_length,
     int64_t* body_length, MemoryPool* pool) {
-  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
+  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false);
   return writer.Write(dictionary_id, dictionary, dst, metadata_length, 
body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
-  RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
+  RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth, true);
   RETURN_NOT_OK(writer.GetTotalSize(batch, size));
   return Status::OK();
 }
@@ -733,30 +732,11 @@ Status FileWriter::Close() {
   return impl_->Close();
 }
 
-// ----------------------------------------------------------------------
-// Write record batches with 64-bit size metadata
-
-class LargeRecordBatchWriter : public RecordBatchWriter {
- public:
-  using RecordBatchWriter::RecordBatchWriter;
-
-  Status CheckArrayMetadata(const Array& arr) override {
-    // No < INT32_MAX length check
-    return Status::OK();
-  }
-
-  Status WriteMetadataMessage(
-      int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) 
override {
-    return WriteLargeRecordBatchMessage(
-        num_rows, body_length, field_nodes_, buffer_meta_, out);
-  }
-};
-
 Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t 
buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth) {
-  LargeRecordBatchWriter writer(pool, buffer_start_offset, 
max_recursion_depth);
-  return writer.Write(batch, dst, metadata_length, body_length);
+    MemoryPool* pool) {
+  return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, 
body_length,
+      pool, kMaxNestingDepth, true);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 1271652..3b7e710 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -45,29 +45,30 @@ class OutputStream;
 
 namespace ipc {
 
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-// output stream in a contiguous block. The record batch metadata is written as
-// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-// prefixed by its size, followed by each of the memory buffers in the batch
-// written end to end (with appropriate alignment and padding):
-//
-// <int32: metadata size> <uint8*: metadata> <buffers>
-//
-// Finally, the absolute offsets (relative to the start of the output stream)
-// to the end of the body and end of the metadata / data header (suffixed by
-// the header size) is returned in out-variables
-//
-// @param(in) buffer_start_offset: the start offset to use in the buffer 
metadata,
-// default should be 0
-//
-// @param(out) metadata_length: the size of the length-prefixed flatbuffer
-// including padding to a 64-byte boundary
-//
-// @param(out) body_length: the size of the contiguous buffer block plus
-// padding bytes
+/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
+/// output stream in a contiguous block. The record batch metadata is written 
as
+/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
+/// prefixed by its size, followed by each of the memory buffers in the batch
+/// written end to end (with appropriate alignment and padding):
+///
+/// <int32: metadata size> <uint8*: metadata> <buffers>
+///
+/// Finally, the absolute offsets (relative to the start of the output stream)
+/// to the end of the body and end of the metadata / data header (suffixed by
+/// the header size) is returned in out-variables
+///
+/// @param(in) buffer_start_offset the start offset to use in the buffer 
metadata,
+/// default should be 0
+/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be
+/// readable by other Arrow implementations
+/// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+/// including padding to a 64-byte boundary
+/// @param(out) body_length: the size of the contiguous buffer block plus
+/// padding bytes
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
+    MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth,
+    bool allow_64bit = false);
 
 // Write Array as a DictionaryBatch message
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& 
dictionary,
@@ -116,13 +117,11 @@ class ARROW_EXPORT FileWriter : public StreamWriter {
   std::unique_ptr<FileWriterImpl> impl_;
 };
 
-// ----------------------------------------------------------------------
-
-/// EXPERIMENTAL: Write record batch using LargeRecordBatch IPC metadata. This
-/// data may not be readable by all Arrow implementations
+/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data
+/// may not be readable by all Arrow implementations
 Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t 
buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
+    MemoryPool* pool);
 
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/format/Message.fbs
----------------------------------------------------------------------
diff --git a/format/Message.fbs b/format/Message.fbs
index e56366d..ff30ace 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -290,12 +290,12 @@ struct Buffer {
 struct FieldNode {
   /// The number of value slots in the Arrow array at this level of a nested
   /// tree
-  length: int;
+  length: long;
 
   /// The number of observed nulls. Fields with null_count == 0 may choose not
   /// to write their physical validity bitmap out as a materialized buffer,
   /// instead setting the length of the bitmap buffer to 0.
-  null_count: int;
+  null_count: long;
 }
 
 /// A data header describing the shared memory layout of a "record" or "row"
@@ -304,7 +304,7 @@ struct FieldNode {
 table RecordBatch {
   /// number of records / rows. The arrays in the batch should all have this
   /// length
-  length: int;
+  length: long;
 
   /// Nodes correspond to the pre-ordered flattened logical schema
   nodes: [FieldNode];
@@ -319,22 +319,6 @@ table RecordBatch {
 }
 
 /// ----------------------------------------------------------------------
-/// EXPERIMENTAL: A RecordBatch type that supports data with more than 2^31 - 1
-/// elements. Arrow implementations do not need to implement this type to be
-/// compliant
-
-struct LargeFieldNode {
-  length: long;
-  null_count: long;
-}
-
-table LargeRecordBatch {
-  length: long;
-  nodes: [LargeFieldNode];
-  buffers: [Buffer];
-}
-
-/// ----------------------------------------------------------------------
 /// For sending dictionary encoding information. Any Field can be
 /// dictionary-encoded, but in this case none of its children may be
 /// dictionary-encoded.
@@ -356,7 +340,7 @@ table DictionaryBatch {
 /// which may include experimental metadata types. For maximum compatibility,
 /// it is best to send data using RecordBatch
 union MessageHeader {
-  Schema, DictionaryBatch, RecordBatch, LargeRecordBatch
+  Schema, DictionaryBatch, RecordBatch
 }
 
 table Message {

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java 
b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
index 71dd0ab..72ce982 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
@@ -34,7 +34,7 @@ public class ArrowFieldNode implements FBSerializable {
 
   @Override
   public int writeTo(FlatBufferBuilder builder) {
-    return FieldNode.createFieldNode(builder, length, nullCount);
+    return FieldNode.createFieldNode(builder, (long)length, (long)nullCount);
   }
 
   public int getNullCount() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/ced9d766/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 92a6c0c..f85fb51 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -207,7 +207,7 @@ public class MessageSerializer {
     List<ArrowFieldNode> nodes = new ArrayList<>();
     for (int i = 0; i < nodesLength; ++i) {
       FieldNode node = recordBatchFB.nodes(i);
-      nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
+      nodes.add(new ArrowFieldNode((int)node.length(), (int)node.nullCount()));
     }
     List<ArrowBuf> buffers = new ArrayList<>();
     for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
@@ -216,7 +216,7 @@ public class MessageSerializer {
       buffers.add(vectorBuffer);
     }
     ArrowRecordBatch arrowRecordBatch =
-        new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
+        new ArrowRecordBatch((int)recordBatchFB.length(), nodes, buffers);
     body.release();
     return arrowRecordBatch;
   }

Reply via email to