ARROW-267: [C++] Implement file format layout for IPC/RPC

Standing up the PR to get some feedback. I still have to implement the read 
path for record batches and then add a test suite. I'd also like to add some 
documentation about the structure of the file format and some of the implicit 
assumptions (e.g. word alignment) -- I put a placeholder `IPC.md` document here 
for this.

I also conformed the language re: record batches (had been using "row batch" in 
the C++ code) to make things more sane.

Note we are not yet able to write OS files here, see ARROW-293. Will tackle 
that in a follow up PR, and then we should be in a position to integration test.

Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #139 from wesm/ARROW-267 and squashes the following commits:

9bdbbd4 [Wes McKinney] Get test suite passing, add missing metadata adapters 
for string, binary
4d3cc1d [Wes McKinney] cpplint
2ec1aad [Wes McKinney] Draft failing file roundtrip test
358309b [Wes McKinney] Move record batch test fixtures into test-common.h
b88bce0 [Wes McKinney] Finish draft of FileReader::GetRecordBatch. Add body end 
offset to ipc adapter
edf36e7 [Wes McKinney] Start drafting FileReader IPC implementation. Change 
record batch data header to write metadata size int32_t as suffix rather than 
prefix
95157f2 [Wes McKinney] Make record batch writes aligned on word boundaries
7c50251 [Wes McKinney] Make the interface for WriteRecordBatch more flexible 
(not require constructing a RecordBatch object)
ab4056f [Wes McKinney] Drafting file reader/writer API. Implement 
BufferOutputStream and write file footers to an OutputStream
113ac7b [Wes McKinney] Draft file footer metadata write/read path with simple 
unit test


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/7e39747e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/7e39747e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/7e39747e

Branch: refs/heads/master
Commit: 7e39747eec05379710e1a42ecbaf1d9795bc3cf0
Parents: 430bd95
Author: Wes McKinney <wes.mckin...@twosigma.com>
Authored: Wed Sep 21 18:15:58 2016 -0400
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Wed Sep 21 18:15:58 2016 -0400

----------------------------------------------------------------------
 NOTICE.txt                             |   6 +
 cpp/src/arrow/io/memory.cc             |  37 ++++
 cpp/src/arrow/io/memory.h              |  18 +-
 cpp/src/arrow/ipc/CMakeLists.txt       |  18 +-
 cpp/src/arrow/ipc/adapter.cc           | 126 +++++++-----
 cpp/src/arrow/ipc/adapter.h            |  47 +++--
 cpp/src/arrow/ipc/file.cc              | 210 ++++++++++++++++++++
 cpp/src/arrow/ipc/file.h               | 146 ++++++++++++++
 cpp/src/arrow/ipc/ipc-adapter-test.cc  | 284 ++++++----------------------
 cpp/src/arrow/ipc/ipc-file-test.cc     | 125 ++++++++++++
 cpp/src/arrow/ipc/ipc-metadata-test.cc |  77 +++++++-
 cpp/src/arrow/ipc/metadata-internal.cc |  46 +++--
 cpp/src/arrow/ipc/metadata-internal.h  |   9 +
 cpp/src/arrow/ipc/metadata.cc          | 171 ++++++++++++++---
 cpp/src/arrow/ipc/metadata.h           |  64 ++++++-
 cpp/src/arrow/ipc/test-common.h        | 193 ++++++++++++++++++-
 cpp/src/arrow/ipc/util.h               |   8 +
 cpp/src/arrow/parquet/reader.h         |   2 +-
 cpp/src/arrow/parquet/writer.h         |   2 +-
 cpp/src/arrow/table.cc                 |   4 +-
 cpp/src/arrow/table.h                  |  16 +-
 format/IPC.md                          |   3 +
 format/README.md                       |   1 +
 23 files changed, 1231 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index a851016..ce6e567 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -12,3 +12,9 @@ This product includes software from the Numpy project 
(BSD-new)
  
https://github.com/numpy/numpy/blob/e1f191c46f2eebd6cb892a4bfe14d9dd43a06c4e/numpy/core/src/multiarray/multiarraymodule.c#L2910
  * Copyright (c) 1995, 1996, 1997 Jim Hugunin, hugu...@mit.edu
  * Copyright (c) 2005 Travis E. Oliphant oliph...@ee.byu.edu Brigham Young 
University
+
+This product includes software from the Feather project (Apache 2.0)
+https://github.com/wesm/feather
+
+This product includes software from the DyND project (BSD 2-clause)
+https://github.com/libdynd

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 1dd6c3a..c168c91 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -207,6 +207,43 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* 
data, int64_t nbytes) {
 }
 
 // ----------------------------------------------------------------------
+// OutputStream that writes to resizable buffer
+
+static constexpr int64_t kBufferMinimumSize = 256;
+
+BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& 
buffer)
+    : buffer_(buffer),
+      capacity_(buffer->size()),
+      position_(0),
+      mutable_data_(buffer->mutable_data()) {}
+
+Status BufferOutputStream::Close() {
+  return Status::OK();
+}
+
+Status BufferOutputStream::Tell(int64_t* position) {
+  *position = position_;
+  return Status::OK();
+}
+
+Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
+  RETURN_NOT_OK(Reserve(nbytes));
+  std::memcpy(mutable_data_ + position_, data, nbytes);
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status BufferOutputStream::Reserve(int64_t nbytes) {
+  while (position_ + nbytes > capacity_) {
+    int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
+    RETURN_NOT_OK(buffer_->Resize(new_capacity));
+    capacity_ = new_capacity;
+  }
+  mutable_data_ = buffer_->mutable_data();
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
 // In-memory buffer reader
 
 Status BufferReader::Close() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 6fe47c3..51601a0 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -32,32 +32,30 @@
 namespace arrow {
 
 class Buffer;
-class MutableBuffer;
+class ResizableBuffer;
 class Status;
 
 namespace io {
 
 // An output stream that writes to a MutableBuffer, such as one obtained from a
 // memory map
-//
-// TODO(wesm): Implement this class
 class ARROW_EXPORT BufferOutputStream : public OutputStream {
  public:
-  explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
-      : buffer_(buffer) {}
+  explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
 
   // Implement the OutputStream interface
   Status Close() override;
   Status Tell(int64_t* position) override;
-  Status Write(const uint8_t* data, int64_t length) override;
-
-  // Returns the number of bytes remaining in the buffer
-  int64_t bytes_remaining() const;
+  Status Write(const uint8_t* data, int64_t nbytes) override;
 
  private:
-  std::shared_ptr<MutableBuffer> buffer_;
+  // Ensures there is sufficient space available to write nbytes
+  Status Reserve(int64_t nbytes);
+
+  std::shared_ptr<ResizableBuffer> buffer_;
   int64_t capacity_;
   int64_t position_;
+  uint8_t* mutable_data_;
 };
 
 // A memory source that uses memory-mapped files for memory interactions

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index e5553a6..bde8c5b 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -33,6 +33,7 @@ set(ARROW_IPC_TEST_LINK_LIBS
 
 set(ARROW_IPC_SRCS
   adapter.cc
+  file.cc
   metadata.cc
   metadata-internal.cc
 )
@@ -60,6 +61,10 @@ ADD_ARROW_TEST(ipc-adapter-test)
 ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
   ${ARROW_IPC_TEST_LINK_LIBS})
 
+ADD_ARROW_TEST(ipc-file-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-file-test
+  ${ARROW_IPC_TEST_LINK_LIBS})
+
 ADD_ARROW_TEST(ipc-metadata-test)
 ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
   ${ARROW_IPC_TEST_LINK_LIBS})
@@ -70,14 +75,20 @@ set_source_files_properties(Metadata_generated.h PROPERTIES 
GENERATED TRUE)
 set(OUTPUT_DIR ${CMAKE_SOURCE_DIR}/src/arrow/ipc)
 set(FBS_OUTPUT_FILES "${OUTPUT_DIR}/Message_generated.h")
 
-set(FBS_SRC ${CMAKE_SOURCE_DIR}/../format/Message.fbs)
-get_filename_component(ABS_FBS_SRC ${FBS_SRC} ABSOLUTE)
+set(FBS_SRC
+  ${CMAKE_SOURCE_DIR}/../format/Message.fbs
+  ${CMAKE_SOURCE_DIR}/../format/File.fbs)
+
+foreach(FIL ${FBS_SRC})
+  get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+  list(APPEND ABS_FBS_SRC ${ABS_FIL})
+endforeach()
 
 add_custom_command(
   OUTPUT ${FBS_OUTPUT_FILES}
   COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${ABS_FBS_SRC}
   DEPENDS ${ABS_FBS_SRC}
-  COMMENT "Running flatc compiler on ${FBS_SRC}"
+  COMMENT "Running flatc compiler on ${ABS_FBS_SRC}"
   VERBATIM
 )
 
@@ -87,6 +98,7 @@ add_dependencies(arrow_objlib metadata_fbs)
 # Headers: top level
 install(FILES
   adapter.h
+  file.h
   metadata.h
   DESTINATION include/arrow/ipc)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 0e101c8..89b7fb9 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -95,7 +95,7 @@ static bool IsListType(const DataType* type) {
 }
 
 // ----------------------------------------------------------------------
-// Row batch write path
+// Record batch write path
 
 Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* 
field_nodes,
     std::vector<std::shared_ptr<Buffer>>* buffers, int max_recursion_depth) {
@@ -132,28 +132,32 @@ Status VisitArray(const Array* arr, 
std::vector<flatbuf::FieldNode>* field_nodes
   return Status::OK();
 }
 
-class RowBatchWriter {
+class RecordBatchWriter {
  public:
-  RowBatchWriter(const RowBatch* batch, int max_recursion_depth)
-      : batch_(batch), max_recursion_depth_(max_recursion_depth) {}
+  RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, 
int32_t num_rows,
+      int max_recursion_depth)
+      : columns_(&columns),
+        num_rows_(num_rows),
+        max_recursion_depth_(max_recursion_depth) {}
 
   Status AssemblePayload() {
     // Perform depth-first traversal of the row-batch
-    for (int i = 0; i < batch_->num_columns(); ++i) {
-      const Array* arr = batch_->column(i).get();
+    for (size_t i = 0; i < columns_->size(); ++i) {
+      const Array* arr = (*columns_)[i].get();
       RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, 
max_recursion_depth_));
     }
     return Status::OK();
   }
 
-  Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
-    // Write out all the buffers contiguously and compute the total size of the
-    // memory payload
-    int64_t offset = 0;
-
+  Status Write(
+      io::OutputStream* dst, int64_t* body_end_offset, int64_t* 
header_end_offset) {
     // Get the starting position
-    int64_t position;
-    RETURN_NOT_OK(dst->Tell(&position));
+    int64_t start_position;
+    RETURN_NOT_OK(dst->Tell(&start_position));
+
+    // Keep track of the current position so we can determine the size of the
+    // message body
+    int64_t position = start_position;
 
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
@@ -175,14 +179,16 @@ class RowBatchWriter {
       // are using from any OS-level shared memory. The thought is that systems
       // may (in the future) associate integer page id's with physical memory
       // pages (according to whatever is the desired shared memory mechanism)
-      buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
+      buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
 
       if (size > 0) {
         RETURN_NOT_OK(dst->Write(buffer->data(), size));
-        offset += size;
+        position += size;
       }
     }
 
+    *body_end_offset = position;
+
     // Now that we have computed the locations of all of the buffers in shared
     // memory, the data header can be converted to a flatbuffer and written out
     //
@@ -192,27 +198,43 @@ class RowBatchWriter {
     // construct the flatbuffer data accessor object (see arrow::ipc::Message)
     std::shared_ptr<Buffer> data_header;
     RETURN_NOT_OK(WriteDataHeader(
-        batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header));
+        num_rows_, position - start_position, field_nodes_, buffer_meta_, 
&data_header));
 
     // Write the data header at the end
     RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
 
-    *data_header_offset = position + offset;
+    position += data_header->size();
+    *header_end_offset = position;
+
+    return Align(dst, &position);
+  }
+
+  Status Align(io::OutputStream* dst, int64_t* position) {
+    // Write all buffers here on word boundaries
+    // TODO(wesm): Is there benefit to 64-byte padding in IPC?
+    int64_t remainder = PaddedLength(*position) - *position;
+    if (remainder > 0) {
+      RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
+      *position += remainder;
+    }
     return Status::OK();
   }
 
   // This must be called after invoking AssemblePayload
   Status GetTotalSize(int64_t* size) {
     // emulates the behavior of Write without actually writing
+    int64_t body_offset;
     int64_t data_header_offset;
     MockOutputStream dst;
-    RETURN_NOT_OK(Write(&dst, &data_header_offset));
+    RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
     *size = dst.GetExtentBytesWritten();
     return Status::OK();
   }
 
  private:
-  const RowBatch* batch_;
+  // Do not copy this vector. Ownership must be retained elsewhere
+  const std::vector<std::shared_ptr<Array>>* columns_;
+  int32_t num_rows_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
@@ -220,29 +242,29 @@ class RowBatchWriter {
   int max_recursion_depth_;
 };
 
-Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* 
header_offset,
-    int max_recursion_depth) {
+Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
+    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
+    int64_t* header_end_offset, int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
-  RowBatchWriter serializer(batch, max_recursion_depth);
+  RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
   RETURN_NOT_OK(serializer.AssemblePayload());
-  return serializer.Write(dst, header_offset);
+  return serializer.Write(dst, body_end_offset, header_end_offset);
 }
 
-Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
-  RowBatchWriter serializer(batch, kMaxIpcRecursionDepth);
+Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
+  RecordBatchWriter serializer(
+      batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth);
   RETURN_NOT_OK(serializer.AssemblePayload());
   RETURN_NOT_OK(serializer.GetTotalSize(size));
   return Status::OK();
 }
 
 // ----------------------------------------------------------------------
-// Row batch read path
+// Record batch read path
 
-static constexpr int64_t INIT_METADATA_SIZE = 4096;
-
-class RowBatchReader::RowBatchReaderImpl {
+class RecordBatchReader::RecordBatchReaderImpl {
  public:
-  RowBatchReaderImpl(io::ReadableFileInterface* file,
+  RecordBatchReaderImpl(io::ReadableFileInterface* file,
       const std::shared_ptr<RecordBatchMessage>& metadata, int 
max_recursion_depth)
       : file_(file), metadata_(metadata), 
max_recursion_depth_(max_recursion_depth) {
     num_buffers_ = metadata->num_buffers();
@@ -250,7 +272,7 @@ class RowBatchReader::RowBatchReaderImpl {
   }
 
   Status AssembleBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out) {
+      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* 
out) {
     std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
 
     // The field_index and buffer_index are incremented in NextArray based on
@@ -263,7 +285,7 @@ class RowBatchReader::RowBatchReaderImpl {
       RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
     }
 
-    *out = std::make_shared<RowBatch>(schema, metadata_->length(), arrays);
+    *out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays);
     return Status::OK();
   }
 
@@ -359,29 +381,31 @@ class RowBatchReader::RowBatchReaderImpl {
   int num_flattened_fields_;
 };
 
-Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
-    std::shared_ptr<RowBatchReader>* out) {
-  return Open(file, position, kMaxIpcRecursionDepth, out);
+Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
+    std::shared_ptr<RecordBatchReader>* out) {
+  return Open(file, offset, kMaxIpcRecursionDepth, out);
 }
 
-Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
-    int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
-  std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata));
+Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
+    int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
+  std::shared_ptr<Buffer> buffer;
+  RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), 
&buffer));
 
-  int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
+  int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());
 
-  // We may not need to call ReadAt again
-  if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
-    // We don't have enough data, read the indicated metadata size.
-    RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, 
&metadata));
+  if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
+    return Status::Invalid("metadata size invalid");
   }
 
+  // Read the metadata
+  RETURN_NOT_OK(
+      file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, 
&buffer));
+
   // TODO(wesm): buffer slicing here would be better in case ReadAt returns
   // allocated memory
 
   std::shared_ptr<Message> message;
-  RETURN_NOT_OK(Message::Open(metadata, &message));
+  RETURN_NOT_OK(Message::Open(buffer, &message));
 
   if (message->type() != Message::RECORD_BATCH) {
     return Status::Invalid("Metadata message is not a record batch");
@@ -389,19 +413,19 @@ Status RowBatchReader::Open(io::ReadableFileInterface* 
file, int64_t position,
 
   std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
 
-  std::shared_ptr<RowBatchReader> result(new RowBatchReader());
-  result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, 
max_recursion_depth));
+  std::shared_ptr<RecordBatchReader> result(new RecordBatchReader());
+  result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, 
max_recursion_depth));
   *out = result;
 
   return Status::OK();
 }
 
 // Here the explicit destructor is required for compilers to be aware of
-// the complete information of RowBatchReader::RowBatchReaderImpl class
-RowBatchReader::~RowBatchReader() {}
+// the complete information of RecordBatchReader::RecordBatchReaderImpl class
+RecordBatchReader::~RecordBatchReader() {}
 
-Status RowBatchReader::GetRowBatch(
-    const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out) {
+Status RecordBatchReader::GetRecordBatch(
+    const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
   return impl_->AssembleBatch(schema, out);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 215b46f..3fde18d 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -23,13 +23,14 @@
 
 #include <cstdint>
 #include <memory>
+#include <vector>
 
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
 class Array;
-class RowBatch;
+class RecordBatch;
 class Schema;
 class Status;
 
@@ -50,7 +51,7 @@ class RecordBatchMessage;
 // TODO(emkornfield) investigate this more
 constexpr int kMaxIpcRecursionDepth = 64;
 
-// Write the RowBatch (collection of equal-length Arrow arrays) to the output
+// Write the RecordBatch (collection of equal-length Arrow arrays) to the 
output
 // stream
 //
 // First, each of the memory buffers are written out end-to-end
@@ -60,39 +61,43 @@ constexpr int kMaxIpcRecursionDepth = 64;
 //
 // <int32: metadata size> <uint8*: metadata>
 //
-// Finally, the absolute offset (relative to the start of the output stream) to
-// the start of the metadata / data header is returned in an out-variable
-ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch,
-    int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
+// Finally, the absolute offsets (relative to the start of the output stream)
+// to the end of the body and end of the metadata / data header (suffixed by
+// the header size) is returned in out-variables
+ARROW_EXPORT Status WriteRecordBatch(const 
std::vector<std::shared_ptr<Array>>& columns,
+    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
+    int64_t* header_end_offset, int max_recursion_depth = 
kMaxIpcRecursionDepth);
 
-// int64_t GetRowBatchMetadata(const RowBatch* batch);
+// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
 
 // Compute the precise number of bytes needed in a contiguous memory segment to
-// write the row batch. This involves generating the complete serialized
+// write the record batch. This involves generating the complete serialized
 // Flatbuffers metadata.
-ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size);
+ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* 
size);
 
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads
 
-class ARROW_EXPORT RowBatchReader {
+class ARROW_EXPORT RecordBatchReader {
  public:
-  static Status Open(io::ReadableFileInterface* file, int64_t position,
-      std::shared_ptr<RowBatchReader>* out);
+  // The offset is the absolute position to the *end* of the record batch data
+  // header
+  static Status Open(io::ReadableFileInterface* file, int64_t offset,
+      std::shared_ptr<RecordBatchReader>* out);
 
-  static Status Open(io::ReadableFileInterface* file, int64_t position,
-      int max_recursion_depth, std::shared_ptr<RowBatchReader>* out);
+  static Status Open(io::ReadableFileInterface* file, int64_t offset,
+      int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out);
 
-  virtual ~RowBatchReader();
+  virtual ~RecordBatchReader();
 
-  // Reassemble the row batch. A Schema is required to be able to construct the
-  // right array containers
-  Status GetRowBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out);
+  // Reassemble the record batch. A Schema is required to be able to construct
+  // the right array containers
+  Status GetRecordBatch(
+      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* 
out);
 
  private:
-  class RowBatchReaderImpl;
-  std::unique_ptr<RowBatchReaderImpl> impl_;
+  class RecordBatchReaderImpl;
+  std::unique_ptr<RecordBatchReaderImpl> impl_;
 };
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
new file mode 100644
index 0000000..2bf10dd
--- /dev/null
+++ b/cpp/src/arrow/ipc/file.cc
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/file.h"
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <vector>
+
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+static constexpr const char* kArrowMagicBytes = "ARROW1";
+
+// ----------------------------------------------------------------------
+// Writer implementation
+
+FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema)
+    : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+
+Status FileWriter::UpdatePosition() {
+  return sink_->Tell(&position_);
+}
+
+Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
+    std::shared_ptr<FileWriter>* out) {
+  *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema));  // ctor 
is private
+  RETURN_NOT_OK((*out)->UpdatePosition());
+  return Status::OK();
+}
+
+Status FileWriter::Write(const uint8_t* data, int64_t nbytes) {
+  RETURN_NOT_OK(sink_->Write(data, nbytes));
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status FileWriter::Align() {
+  int64_t remainder = PaddedLength(position_) - position_;
+  if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+  return Status::OK();
+}
+
+Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
+  RETURN_NOT_OK(Write(data, nbytes));
+  return Align();
+}
+
+Status FileWriter::Start() {
+  RETURN_NOT_OK(WriteAligned(
+      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes)));
+  started_ = true;
+  return Status::OK();
+}
+
+Status FileWriter::CheckStarted() {
+  if (!started_) { return Start(); }
+  return Status::OK();
+}
+
+Status FileWriter::WriteRecordBatch(
+    const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
+  RETURN_NOT_OK(CheckStarted());
+
+  int64_t offset = position_;
+
+  int64_t body_end_offset;
+  int64_t header_end_offset;
+  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
+      columns, num_rows, sink_, &body_end_offset, &header_end_offset));
+  RETURN_NOT_OK(UpdatePosition());
+
+  DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned 
writes";
+
+  // There may be padding ever the end of the metadata, so we cannot rely on
+  // position_
+  int32_t metadata_length = header_end_offset - body_end_offset;
+  int32_t body_length = body_end_offset - offset;
+
+  // Append metadata, to be written in the footer later
+  record_batches_.emplace_back(offset, metadata_length, body_length);
+
+  return Status::OK();
+}
+
+Status FileWriter::Close() {
+  // Write metadata
+  int64_t initial_position = position_;
+  RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, 
sink_));
+  RETURN_NOT_OK(UpdatePosition());
+
+  // Write footer length
+  int32_t footer_length = position_ - initial_position;
+
+  if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); }
+
+  RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(&footer_length), 
sizeof(int32_t)));
+
+  // Write magic bytes to end file
+  return Write(
+      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes));
+}
+
+// ----------------------------------------------------------------------
+// Reader implementation
+
+FileReader::FileReader(
+    const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset)
+    : file_(file), footer_offset_(footer_offset) {}
+
+FileReader::~FileReader() {}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    std::shared_ptr<FileReader>* reader) {
+  int64_t footer_offset;
+  RETURN_NOT_OK(file->GetSize(&footer_offset));
+  return Open(file, footer_offset, reader);
+}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
+  *reader = std::shared_ptr<FileReader>(new FileReader(file, footer_offset));
+  return (*reader)->ReadFooter();
+}
+
+Status FileReader::ReadFooter() {
+  int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
+
+  if (footer_offset_ <= magic_size * 2 + 4) {
+    std::stringstream ss;
+    ss << "File is too small: " << footer_offset_;
+    return Status::Invalid(ss.str());
+  }
+
+  std::shared_ptr<Buffer> buffer;
+  int file_end_size = magic_size + sizeof(int32_t);
+  RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, 
&buffer));
+
+  if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
+    return Status::Invalid("Not an Arrow file");
+  }
+
+  int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+  if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > 
footer_offset_) {
+    return Status::Invalid("File is smaller than indicated metadata size");
+  }
+
+  // Now read the footer
+  RETURN_NOT_OK(file_->ReadAt(
+      footer_offset_ - footer_length - file_end_size, footer_length, &buffer));
+  RETURN_NOT_OK(FileFooter::Open(buffer, &footer_));
+
+  // Get the schema
+  return footer_->GetSchema(&schema_);
+}
+
+const std::shared_ptr<Schema>& FileReader::schema() const {
+  return schema_;
+}
+
+int FileReader::num_dictionaries() const {
+  return footer_->num_dictionaries();
+}
+
+int FileReader::num_record_batches() const {
+  return footer_->num_record_batches();
+}
+
+MetadataVersion::type FileReader::version() const {
+  return footer_->version();
+}
+
+Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+  DCHECK_GE(i, 0);
+  DCHECK_LT(i, num_record_batches());
+  FileBlock block = footer_->record_batch(i);
+  int64_t metadata_end_offset = block.offset + block.body_length + 
block.metadata_length;
+
+  std::shared_ptr<RecordBatchReader> reader;
+  RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, 
&reader));
+
+  return reader->GetRecordBatch(schema_, batch);
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
new file mode 100644
index 0000000..4b79c98
--- /dev/null
+++ b/cpp/src/arrow/ipc/file.h
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow file layout for IPC/RPC purposes and short-lived storage
+
+#ifndef ARROW_IPC_FILE_H
+#define ARROW_IPC_FILE_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/ipc/metadata.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct Field;
+class RecordBatch;
+class Schema;
+class Status;
+
+namespace io {
+
+class OutputStream;
+class ReadableFileInterface;
+
+}  // namespace io
+
+namespace ipc {
+
+class ARROW_EXPORT FileWriter {
+ public:
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
+      std::shared_ptr<FileWriter>* out);
+
+  // TODO(wesm): Write dictionaries
+
+  Status WriteRecordBatch(
+      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+
+  Status Close();
+
+ private:
+  FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+
+  Status CheckStarted();
+  Status Start();
+
+  Status UpdatePosition();
+
+  // Adds padding bytes if necessary to ensure all memory blocks are written on
+  // 8-byte boundaries.
+  Status Align();
+
+  // Write data and update position
+  Status Write(const uint8_t* data, int64_t nbytes);
+
+  // Write and align
+  Status WriteAligned(const uint8_t* data, int64_t nbytes);
+
+  io::OutputStream* sink_;
+  std::shared_ptr<Schema> schema_;
+  int64_t position_;
+  bool started_;
+
+  std::vector<FileBlock> dictionaries_;
+  std::vector<FileBlock> record_batches_;
+};
+
+class ARROW_EXPORT FileReader {
+ public:
+  ~FileReader();
+
+  // Open a file-like object that is assumed to be self-contained; i.e., the
+  // end of the file interface is the end of the Arrow file. Note that there
+  // can be any amount of data preceding the Arrow-formatted data, because we
+  // need only locate the end of the Arrow file stream to discover the metadata
+  // and then proceed to read the data into memory.
+  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+      std::shared_ptr<FileReader>* reader);
+
+  // If the file is embedded within some larger file or memory region, you can
+  // pass the absolute memory offset to the end of the file (which contains the
+  // metadata footer). The metadata must have been written with memory offsets
+  // relative to the start of the containing file
+  //
+  // @param file: the data source
+  // @param footer_offset: the position of the end of the Arrow "file"
+  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+      int64_t footer_offset, std::shared_ptr<FileReader>* reader);
+
+  const std::shared_ptr<Schema>& schema() const;
+
+  // Shared dictionaries for dictionary-encoding cross record batches
+  // TODO(wesm): Implement dictionary reading when we also have dictionary
+  // encoding
+  int num_dictionaries() const;
+
+  int num_record_batches() const;
+
+  MetadataVersion::type version() const;
+
+  // Read a record batch from the file. Does not copy memory if the input
+  // source supports zero-copy.
+  //
+  // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an
+  // "always copy" option)
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
+
+ private:
+  FileReader(
+      const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset);
+
+  Status ReadFooter();
+
+  std::shared_ptr<io::ReadableFileInterface> file_;
+
+  // The location where the Arrow file layout ends. May be the end of the file
+  // or some other location if embedded in a larger file.
+  int64_t footer_offset_;
+
+  std::unique_ptr<FileFooter> footer_;
+  std::shared_ptr<Schema> schema_;
+};
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_FILE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc 
b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index ca4d015..f5611d4 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -43,31 +43,27 @@
 namespace arrow {
 namespace ipc {
 
-// TODO(emkornfield) convert to google style kInt32, etc?
-const auto INT32 = std::make_shared<Int32Type>();
-const auto LIST_INT32 = std::make_shared<ListType>(INT32);
-const auto LIST_LIST_INT32 = std::make_shared<ListType>(LIST_INT32);
-
-typedef Status MakeRowBatch(std::shared_ptr<RowBatch>* out);
-
-class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
-                          public io::MemoryMapFixture {
+class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
+                             public io::MemoryMapFixture {
  public:
   void SetUp() { pool_ = default_memory_pool(); }
   void TearDown() { io::MemoryMapFixture::TearDown(); }
 
-  Status RoundTripHelper(const RowBatch& batch, int memory_map_size,
-      std::shared_ptr<RowBatch>* batch_result) {
+  Status RoundTripHelper(const RecordBatch& batch, int memory_map_size,
+      std::shared_ptr<RecordBatch>* batch_result) {
     std::string path = "test-write-row-batch";
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-    int64_t header_location;
 
-    RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location));
+    int64_t body_end_offset;
+    int64_t header_end_offset;
 
-    std::shared_ptr<RowBatchReader> reader;
-    RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
+    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), 
mmap_.get(),
+        &body_end_offset, &header_end_offset));
 
-    RETURN_NOT_OK(reader->GetRowBatch(batch.schema(), batch_result));
+    std::shared_ptr<RecordBatchReader> reader;
+    RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, 
&reader));
+
+    RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
     return Status::OK();
   }
 
@@ -76,10 +72,10 @@ class TestWriteRowBatch : public 
::testing::TestWithParam<MakeRowBatch*>,
   MemoryPool* pool_;
 };
 
-TEST_P(TestWriteRowBatch, RoundTrip) {
-  std::shared_ptr<RowBatch> batch;
+TEST_P(TestWriteRecordBatch, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
-  std::shared_ptr<RowBatch> batch_result;
+  std::shared_ptr<RecordBatch> batch_result;
   ASSERT_OK(RoundTripHelper(*batch, 1 << 16, &batch_result));
 
   // do checks
@@ -93,217 +89,39 @@ TEST_P(TestWriteRowBatch, RoundTrip) {
   }
 }
 
-Status MakeIntRowBatch(std::shared_ptr<RowBatch>* out) {
-  const int length = 1000;
-
-  // Make the schema
-  auto f0 = std::make_shared<Field>("f0", INT32);
-  auto f1 = std::make_shared<Field>("f1", INT32);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
-
-  // Example data
-  std::shared_ptr<Array> a0, a1;
-  MemoryPool* pool = default_memory_pool();
-  RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
-  RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
-  out->reset(new RowBatch(schema, length, {a0, a1}));
-  return Status::OK();
-}
+INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
+    ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, 
&MakeNonNullRecordBatch,
+                            &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
+                            &MakeStringTypesRecordBatch, &MakeStruct));
 
-template <class Builder, class RawType>
-Status MakeRandomBinaryArray(
-    const TypePtr& type, int32_t length, MemoryPool* pool, ArrayPtr* array) {
-  const std::vector<std::string> values = {
-      "", "", "abc", "123", "efg", "456!@#!@#", "12312"};
-  Builder builder(pool, type);
-  const auto values_len = values.size();
-  for (int32_t i = 0; i < length; ++i) {
-    int values_index = i % values_len;
-    if (values_index == 0) {
-      RETURN_NOT_OK(builder.AppendNull());
-    } else {
-      const std::string& value = values[values_index];
-      RETURN_NOT_OK(
-          builder.Append(reinterpret_cast<const RawType*>(value.data()), 
value.size()));
-    }
-  }
-  *array = builder.Finish();
-  return Status::OK();
-}
-
-Status MakeStringTypesRowBatch(std::shared_ptr<RowBatch>* out) {
-  const int32_t length = 500;
-  auto string_type = std::make_shared<StringType>();
-  auto binary_type = std::make_shared<BinaryType>();
-  auto f0 = std::make_shared<Field>("f0", string_type);
-  auto f1 = std::make_shared<Field>("f1", binary_type);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
-
-  std::shared_ptr<Array> a0, a1;
-  MemoryPool* pool = default_memory_pool();
-
-  {
-    auto status =
-        MakeRandomBinaryArray<StringBuilder, char>(string_type, length, pool, 
&a0);
-    RETURN_NOT_OK(status);
-  }
-  {
-    auto status =
-        MakeRandomBinaryArray<BinaryBuilder, uint8_t>(binary_type, length, 
pool, &a1);
-    RETURN_NOT_OK(status);
-  }
-  out->reset(new RowBatch(schema, length, {a0, a1}));
-  return Status::OK();
-}
-
-Status MakeListRowBatch(std::shared_ptr<RowBatch>* out) {
-  // Make the schema
-  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
-  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
-  auto f2 = std::make_shared<Field>("f2", INT32);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
-
-  // Example data
-
-  MemoryPool* pool = default_memory_pool();
-  const int length = 200;
-  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
-  const bool include_nulls = true;
-  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
-  RETURN_NOT_OK(
-      MakeRandomListArray(leaf_values, length, include_nulls, pool, 
&list_array));
-  RETURN_NOT_OK(
-      MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
-  RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
-  out->reset(new RowBatch(schema, length, {list_array, list_list_array, 
flat_array}));
-  return Status::OK();
-}
-
-Status MakeZeroLengthRowBatch(std::shared_ptr<RowBatch>* out) {
-  // Make the schema
-  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
-  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
-  auto f2 = std::make_shared<Field>("f2", INT32);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
-
-  // Example data
-  MemoryPool* pool = default_memory_pool();
-  const int length = 200;
-  const bool include_nulls = true;
-  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
-  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
-  RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, 
&list_array));
-  RETURN_NOT_OK(
-      MakeRandomListArray(list_array, 0, include_nulls, pool, 
&list_list_array));
-  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
-  out->reset(new RowBatch(schema, length, {list_array, list_list_array, 
flat_array}));
-  return Status::OK();
-}
-
-Status MakeNonNullRowBatch(std::shared_ptr<RowBatch>* out) {
-  // Make the schema
-  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
-  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
-  auto f2 = std::make_shared<Field>("f2", INT32);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
-
-  // Example data
-  MemoryPool* pool = default_memory_pool();
-  const int length = 50;
-  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
-
-  RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values));
-  bool include_nulls = false;
-  RETURN_NOT_OK(
-      MakeRandomListArray(leaf_values, length, include_nulls, pool, 
&list_array));
-  RETURN_NOT_OK(
-      MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
-  RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
-  out->reset(new RowBatch(schema, length, {list_array, list_list_array, 
flat_array}));
-  return Status::OK();
-}
-
-Status MakeDeeplyNestedList(std::shared_ptr<RowBatch>* out) {
-  const int batch_length = 5;
-  TypePtr type = INT32;
-
-  MemoryPool* pool = default_memory_pool();
-  ArrayPtr array;
-  const bool include_nulls = true;
-  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array));
-  for (int i = 0; i < 63; ++i) {
-    type = 
std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
-    RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, 
pool, &array));
-  }
-
-  auto f0 = std::make_shared<Field>("f0", type);
-  std::shared_ptr<Schema> schema(new Schema({f0}));
-  std::vector<ArrayPtr> arrays = {array};
-  out->reset(new RowBatch(schema, batch_length, arrays));
-  return Status::OK();
-}
-
-Status MakeStruct(std::shared_ptr<RowBatch>* out) {
-  // reuse constructed list columns
-  std::shared_ptr<RowBatch> list_batch;
-  RETURN_NOT_OK(MakeListRowBatch(&list_batch));
-  std::vector<ArrayPtr> columns = {
-      list_batch->column(0), list_batch->column(1), list_batch->column(2)};
-  auto list_schema = list_batch->schema();
-
-  // Define schema
-  std::shared_ptr<DataType> type(new StructType(
-      {list_schema->field(0), list_schema->field(1), list_schema->field(2)}));
-  auto f0 = std::make_shared<Field>("non_null_struct", type);
-  auto f1 = std::make_shared<Field>("null_struct", type);
-  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
-
-  // construct individual nullable/non-nullable struct arrays
-  ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns));
-  std::vector<uint8_t> null_bytes(list_batch->num_rows(), 1);
-  null_bytes[0] = 0;
-  std::shared_ptr<Buffer> null_bitmask;
-  RETURN_NOT_OK(util::bytes_to_bits(null_bytes, &null_bitmask));
-  ArrayPtr with_nulls(
-      new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask));
-
-  // construct batch
-  std::vector<ArrayPtr> arrays = {no_nulls, with_nulls};
-  out->reset(new RowBatch(schema, list_batch->num_rows(), arrays));
-  return Status::OK();
-}
-
-INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
-    ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, 
&MakeNonNullRowBatch,
-                            &MakeZeroLengthRowBatch, &MakeDeeplyNestedList,
-                            &MakeStringTypesRowBatch, &MakeStruct));
-
-void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
+void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
-  int64_t mock_header_location = -1;
+  int64_t mock_header_offset = -1;
+  int64_t mock_body_offset = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location));
-  ASSERT_OK(GetRowBatchSize(batch.get(), &size));
+  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock,
+      &mock_body_offset, &mock_header_offset));
+  ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
 
-TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
-  std::shared_ptr<RowBatch> batch;
+TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
+  std::shared_ptr<RecordBatch> batch;
 
-  ASSERT_OK(MakeIntRowBatch(&batch));
-  TestGetRowBatchSize(batch);
+  ASSERT_OK(MakeIntRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
 
-  ASSERT_OK(MakeListRowBatch(&batch));
-  TestGetRowBatchSize(batch);
+  ASSERT_OK(MakeListRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
 
-  ASSERT_OK(MakeZeroLengthRowBatch(&batch));
-  TestGetRowBatchSize(batch);
+  ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
 
-  ASSERT_OK(MakeNonNullRowBatch(&batch));
-  TestGetRowBatchSize(batch);
+  ASSERT_OK(MakeNonNullRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
 
   ASSERT_OK(MakeDeeplyNestedList(&batch));
-  TestGetRowBatchSize(batch);
+  TestGetRecordBatchSize(batch);
 }
 
 class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
@@ -314,7 +132,7 @@ class RecursionLimits : public ::testing::Test, public 
io::MemoryMapFixture {
   Status WriteToMmap(int recursion_level, bool override_level,
       int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = 
nullptr) {
     const int batch_length = 5;
-    TypePtr type = INT32;
+    TypePtr type = kInt32;
     ArrayPtr array;
     const bool include_nulls = true;
     RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
@@ -328,18 +146,22 @@ class RecursionLimits : public ::testing::Test, public 
io::MemoryMapFixture {
     std::shared_ptr<Schema> schema(new Schema({f0}));
     if (schema_out != nullptr) { *schema_out = schema; }
     std::vector<ArrayPtr> arrays = {array};
-    auto batch = std::make_shared<RowBatch>(schema, batch_length, arrays);
+    auto batch = std::make_shared<RecordBatch>(schema, batch_length, arrays);
 
     std::string path = "test-write-past-max-recursion";
     const int memory_map_size = 1 << 16;
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-    int64_t header_location;
-    int64_t* header_out_param = header_out == nullptr ? &header_location : 
header_out;
+
+    int64_t body_offset;
+    int64_t header_offset;
+
+    int64_t* header_out_param = header_out == nullptr ? &header_offset : 
header_out;
     if (override_level) {
-      return WriteRowBatch(
-          mmap_.get(), batch.get(), header_out_param, recursion_level + 1);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
+          &body_offset, header_out_param, recursion_level + 1);
     } else {
-      return WriteRowBatch(mmap_.get(), batch.get(), header_out_param);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
+          &body_offset, header_out_param);
     }
   }
 
@@ -353,14 +175,14 @@ TEST_F(RecursionLimits, WriteLimit) {
 }
 
 TEST_F(RecursionLimits, ReadLimit) {
-  int64_t header_location = -1;
+  int64_t header_offset = -1;
   std::shared_ptr<Schema> schema;
-  ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));
+  ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema));
 
-  std::shared_ptr<RowBatchReader> reader;
-  ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
-  std::shared_ptr<RowBatch> batch_result;
-  ASSERT_RAISES(Invalid, reader->GetRowBatch(schema, &batch_result));
+  std::shared_ptr<RecordBatchReader> reader;
+  ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
+  std::shared_ptr<RecordBatch> batch_result;
+  ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result));
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc 
b/cpp/src/arrow/ipc/ipc-file-test.cc
new file mode 100644
index 0000000..cd424bf
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/file.h"
+#include "arrow/ipc/test-common.h"
+#include "arrow/ipc/util.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/types/struct.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    buffer_ = std::make_shared<PoolBuffer>(pool_);
+    sink_.reset(new io::BufferOutputStream(buffer_));
+  }
+  void TearDown() {}
+
+  Status RoundTripHelper(
+      const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* 
out_batches) {
+    // Write the file
+    RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), 
&file_writer_));
+    int num_batches = 3;
+    for (int i = 0; i < num_batches; ++i) {
+      RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), 
batch.num_rows()));
+    }
+    RETURN_NOT_OK(file_writer_->Close());
+
+    // Current offset into stream is the end of the file
+    int64_t footer_offset;
+    RETURN_NOT_OK(sink_->Tell(&footer_offset));
+
+    // Open the file
+    auto reader = std::make_shared<io::BufferReader>(buffer_->data(), 
buffer_->size());
+    RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
+
+    EXPECT_EQ(num_batches, file_reader_->num_record_batches());
+
+    out_batches->resize(num_batches);
+    for (int i = 0; i < num_batches; ++i) {
+      RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i]));
+    }
+
+    return Status::OK();
+  }
+
+  void CompareBatch(const RecordBatch* left, const RecordBatch* right) {
+    ASSERT_TRUE(left->schema()->Equals(right->schema()));
+    ASSERT_EQ(left->num_columns(), right->num_columns())
+        << left->schema()->ToString() << " result: " << 
right->schema()->ToString();
+    EXPECT_EQ(left->num_rows(), right->num_rows());
+    for (int i = 0; i < left->num_columns(); ++i) {
+      EXPECT_TRUE(left->column(i)->Equals(right->column(i)))
+          << "Idx: " << i << " Name: " << left->column_name(i);
+    }
+  }
+
+ protected:
+  MemoryPool* pool_;
+
+  std::unique_ptr<io::BufferOutputStream> sink_;
+  std::shared_ptr<PoolBuffer> buffer_;
+
+  std::shared_ptr<FileWriter> file_writer_;
+  std::shared_ptr<FileReader> file_reader_;
+};
+
+TEST_P(TestFileFormat, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+
+  // Compare batches. Same
+  for (size_t i = 0; i < out_batches.size(); ++i) {
+    CompareBatch(batch.get(), out_batches[i].get());
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat,
+    ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, 
&MakeNonNullRecordBatch,
+                            &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
+                            &MakeStringTypesRecordBatch, &MakeStruct));
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc 
b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 51d79cf..1dc3969 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -21,6 +21,7 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/io/memory.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/schema.h"
 #include "arrow/test-util.h"
@@ -31,6 +32,8 @@ namespace arrow {
 
 class Buffer;
 
+namespace ipc {
+
 static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
   if (!lhs->Equals(*rhs)) {
     std::stringstream ss;
@@ -46,14 +49,14 @@ class TestSchemaMessage : public ::testing::Test {
 
   void CheckRoundtrip(const Schema* schema) {
     std::shared_ptr<Buffer> buffer;
-    ASSERT_OK(ipc::WriteSchema(schema, &buffer));
+    ASSERT_OK(WriteSchema(schema, &buffer));
 
-    std::shared_ptr<ipc::Message> message;
-    ASSERT_OK(ipc::Message::Open(buffer, &message));
+    std::shared_ptr<Message> message;
+    ASSERT_OK(Message::Open(buffer, &message));
 
-    ASSERT_EQ(ipc::Message::SCHEMA, message->type());
+    ASSERT_EQ(Message::SCHEMA, message->type());
 
-    std::shared_ptr<ipc::SchemaMessage> schema_msg = message->GetSchema();
+    std::shared_ptr<SchemaMessage> schema_msg = message->GetSchema();
     ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
 
     std::shared_ptr<Schema> schema2;
@@ -94,4 +97,68 @@ TEST_F(TestSchemaMessage, NestedFields) {
   CheckRoundtrip(&schema);
 }
 
+class TestFileFooter : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void CheckRoundtrip(const Schema* schema, const std::vector<FileBlock>& 
dictionaries,
+      const std::vector<FileBlock>& record_batches) {
+    auto buffer = std::make_shared<PoolBuffer>();
+    io::BufferOutputStream stream(buffer);
+
+    ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
+
+    std::unique_ptr<FileFooter> footer;
+    ASSERT_OK(FileFooter::Open(buffer, &footer));
+
+    ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version());
+
+    // Check schema
+    std::shared_ptr<Schema> schema2;
+    ASSERT_OK(footer->GetSchema(&schema2));
+    assert_schema_equal(schema, schema2.get());
+
+    // Check blocks
+    ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
+    ASSERT_EQ(record_batches.size(), footer->num_record_batches());
+
+    for (int i = 0; i < footer->num_dictionaries(); ++i) {
+      CheckBlocks(dictionaries[i], footer->dictionary(i));
+    }
+
+    for (int i = 0; i < footer->num_record_batches(); ++i) {
+      CheckBlocks(record_batches[i], footer->record_batch(i));
+    }
+  }
+
+  void CheckBlocks(const FileBlock& left, const FileBlock& right) {
+    ASSERT_EQ(left.offset, right.offset);
+    ASSERT_EQ(left.metadata_length, right.metadata_length);
+    ASSERT_EQ(left.body_length, right.body_length);
+  }
+
+ private:
+  std::shared_ptr<Schema> example_schema_;
+};
+
+TEST_F(TestFileFooter, Basics) {
+  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
+  Schema schema({f0, f1});
+
+  std::vector<FileBlock> dictionaries;
+  dictionaries.emplace_back(8, 92, 900);
+  dictionaries.emplace_back(1000, 100, 1900);
+  dictionaries.emplace_back(3000, 100, 2900);
+
+  std::vector<FileBlock> record_batches;
+  record_batches.emplace_back(6000, 100, 900);
+  record_batches.emplace_back(7000, 100, 1900);
+  record_batches.emplace_back(9000, 100, 2900);
+  record_batches.emplace_back(12000, 100, 3900);
+
+  CheckRoundtrip(&schema, dictionaries, record_batches);
+}
+
+}  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc 
b/cpp/src/arrow/ipc/metadata-internal.cc
index 05e9c7a..7102012 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -31,10 +31,6 @@
 #include "arrow/util/buffer.h"
 #include "arrow/util/status.h"
 
-typedef flatbuffers::FlatBufferBuilder FBB;
-typedef flatbuffers::Offset<arrow::flatbuf::Field> FieldOffset;
-typedef flatbuffers::Offset<void> Offset;
-
 namespace arrow {
 
 namespace flatbuf = org::apache::arrow::flatbuf;
@@ -52,6 +48,8 @@ const std::shared_ptr<DataType> UINT32 = 
std::make_shared<UInt32Type>();
 const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>();
 const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>();
 const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>();
+const std::shared_ptr<DataType> STRING = std::make_shared<StringType>();
+const std::shared_ptr<DataType> BINARY = std::make_shared<BinaryType>();
 
 static Status IntFromFlatbuffer(
     const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
@@ -102,8 +100,11 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const 
void* type_data,
       return FloatFromFlatuffer(
           static_cast<const flatbuf::FloatingPoint*>(type_data), out);
     case flatbuf::Type_Binary:
+      *out = BINARY;
+      return Status::OK();
     case flatbuf::Type_Utf8:
-      return Status::NotImplemented("Type is not implemented");
+      *out = STRING;
+      return Status::OK();
     case flatbuf::Type_Bool:
       *out = BOOL;
       return Status::OK();
@@ -193,6 +194,14 @@ static Status TypeToFlatbuffer(FBB& fbb, const 
std::shared_ptr<DataType>& type,
       *out_type = flatbuf::Type_FloatingPoint;
       *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
       break;
+    case Type::BINARY:
+      *out_type = flatbuf::Type_Binary;
+      *offset = flatbuf::CreateBinary(fbb).Union();
+      break;
+    case Type::STRING:
+      *out_type = flatbuf::Type_Utf8;
+      *offset = flatbuf::CreateUtf8(fbb).Union();
+      break;
     case Type::LIST:
       *out_type = flatbuf::Type_List;
       return ListToFlatbuffer(fbb, type, children, offset);
@@ -255,19 +264,26 @@ flatbuf::Endianness endianness() {
   return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
 }
 
-Status MessageBuilder::SetSchema(const Schema* schema) {
-  header_type_ = flatbuf::MessageHeader_Schema;
-
+Status SchemaToFlatbuffer(
+    FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out) 
{
   std::vector<FieldOffset> field_offsets;
   for (int i = 0; i < schema->num_fields(); ++i) {
     const std::shared_ptr<Field>& field = schema->field(i);
     FieldOffset offset;
-    RETURN_NOT_OK(FieldToFlatbuffer(fbb_, field, &offset));
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset));
     field_offsets.push_back(offset);
   }
 
-  header_ =
-      flatbuf::CreateSchema(fbb_, endianness(), 
fbb_.CreateVector(field_offsets)).Union();
+  *out = flatbuf::CreateSchema(fbb, endianness(), 
fbb.CreateVector(field_offsets));
+  return Status::OK();
+}
+
+Status MessageBuilder::SetSchema(const Schema* schema) {
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema));
+
+  header_type_ = flatbuf::MessageHeader_Schema;
+  header_ = fb_schema.Union();
   body_length_ = 0;
   return Status::OK();
 }
@@ -301,17 +317,17 @@ Status MessageBuilder::Finish() {
 }
 
 Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
-  // The message buffer is prefixed by the size of the complete flatbuffer as
+  // The message buffer is suffixed by the size of the complete flatbuffer as
   // int32_t
-  // <int32_t: flatbuffer size><uint8_t*: flatbuffer data>
+  // <uint8_t*: flatbuffer data><int32_t: flatbuffer size>
   int32_t size = fbb_.GetSize();
 
   auto result = std::make_shared<PoolBuffer>();
   RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
 
   uint8_t* dst = result->mutable_data();
-  memcpy(dst, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
-  memcpy(dst + sizeof(int32_t), fbb_.GetBufferPointer(), size);
+  memcpy(dst, fbb_.GetBufferPointer(), size);
+  memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
 
   *out = result;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h 
b/cpp/src/arrow/ipc/metadata-internal.h
index d38df84..c404cfd 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -24,7 +24,9 @@
 
 #include "flatbuffers/flatbuffers.h"
 
+#include "arrow/ipc/File_generated.h"
 #include "arrow/ipc/Message_generated.h"
+#include "arrow/ipc/metadata.h"
 
 namespace arrow {
 
@@ -37,11 +39,18 @@ class Status;
 
 namespace ipc {
 
+using FBB = flatbuffers::FlatBufferBuilder;
+using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
+using Offset = flatbuffers::Offset<void>;
+
 static constexpr flatbuf::MetadataVersion kMetadataVersion =
     flatbuf::MetadataVersion_V1_SNAPSHOT;
 
 Status FieldFromFlatbuffer(const flatbuf::Field* field, 
std::shared_ptr<Field>* out);
 
+Status SchemaToFlatbuffer(
+    FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out);
+
 class MessageBuilder {
  public:
   Status SetSchema(const Schema* schema);

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index e510755..66df8a6 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -23,7 +23,8 @@
 
 #include "flatbuffers/flatbuffers.h"
 
-// Generated C++ flatbuffer IDL
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/File_generated.h"
 #include "arrow/ipc/Message_generated.h"
 #include "arrow/ipc/metadata-internal.h"
 
@@ -47,9 +48,10 @@ Status WriteSchema(const Schema* schema, 
std::shared_ptr<Buffer>* out) {
 //----------------------------------------------------------------------
 // Message reader
 
-class Message::Impl {
+class Message::MessageImpl {
  public:
-  explicit Impl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* 
message)
+  explicit MessageImpl(
+      const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* message)
       : buffer_(buffer), message_(message) {}
 
   Message::Type type() const {
@@ -76,31 +78,16 @@ class Message::Impl {
   const flatbuf::Message* message_;
 };
 
-class SchemaMessage::Impl {
- public:
-  explicit Impl(const void* schema)
-      : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
-
-  const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); 
}
-
-  int num_fields() const { return schema_->fields()->size(); }
-
- private:
-  const flatbuf::Schema* schema_;
-};
-
 Message::Message() {}
 
 Status Message::Open(
     const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) {
   std::shared_ptr<Message> result(new Message());
 
-  // The buffer is prefixed by its size as int32_t
-  const uint8_t* fb_head = buffer->data() + sizeof(int32_t);
-  const flatbuf::Message* message = flatbuf::GetMessage(fb_head);
+  const flatbuf::Message* message = flatbuf::GetMessage(buffer->data());
 
   // TODO(wesm): verify message
-  result->impl_.reset(new Impl(buffer, message));
+  result->impl_.reset(new MessageImpl(buffer, message));
   *out = result;
 
   return Status::OK();
@@ -122,10 +109,26 @@ std::shared_ptr<SchemaMessage> Message::GetSchema() {
   return std::make_shared<SchemaMessage>(this->shared_from_this(), 
impl_->header());
 }
 
+// ----------------------------------------------------------------------
+// SchemaMessage
+
+class SchemaMessage::SchemaMessageImpl {
+ public:
+  explicit SchemaMessageImpl(const void* schema)
+      : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
+
+  const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); 
}
+
+  int num_fields() const { return schema_->fields()->size(); }
+
+ private:
+  const flatbuf::Schema* schema_;
+};
+
 SchemaMessage::SchemaMessage(
     const std::shared_ptr<Message>& message, const void* schema) {
   message_ = message;
-  impl_.reset(new Impl(schema));
+  impl_.reset(new SchemaMessageImpl(schema));
 }
 
 int SchemaMessage::num_fields() const {
@@ -146,9 +149,12 @@ Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* 
out) const {
   return Status::OK();
 }
 
-class RecordBatchMessage::Impl {
+// ----------------------------------------------------------------------
+// RecordBatchMessage
+
+class RecordBatchMessage::RecordBatchMessageImpl {
  public:
-  explicit Impl(const void* batch)
+  explicit RecordBatchMessageImpl(const void* batch)
       : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
     nodes_ = batch_->nodes();
     buffers_ = batch_->buffers();
@@ -177,7 +183,7 @@ std::shared_ptr<RecordBatchMessage> 
Message::GetRecordBatch() {
 RecordBatchMessage::RecordBatchMessage(
     const std::shared_ptr<Message>& message, const void* batch) {
   message_ = message;
-  impl_.reset(new Impl(batch));
+  impl_.reset(new RecordBatchMessageImpl(batch));
 }
 
 // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for
@@ -213,5 +219,122 @@ int RecordBatchMessage::num_fields() const {
   return impl_->num_fields();
 }
 
+// ----------------------------------------------------------------------
+// File footer
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+  std::vector<flatbuf::Block> fb_blocks;
+
+  for (const FileBlock& block : blocks) {
+    fb_blocks.emplace_back(block.offset, block.metadata_length, 
block.body_length);
+  }
+
+  return fbb.CreateVectorOfStructs(fb_blocks);
+}
+
+Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& 
dictionaries,
+    const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
+  FBB fbb;
+
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
+
+  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+  auto footer = flatbuf::CreateFooter(
+      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+  fbb.Finish(footer);
+
+  int32_t size = fbb.GetSize();
+
+  return out->Write(fbb.GetBufferPointer(), size);
+}
+
+static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
+  return FileBlock(block->offset(), block->metaDataLength(), 
block->bodyLength());
+}
+
+class FileFooter::FileFooterImpl {
+ public:
+  FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* 
footer)
+      : buffer_(buffer), footer_(footer) {}
+
+  int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+  int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+  MetadataVersion::type version() const {
+    switch (footer_->version()) {
+      case flatbuf::MetadataVersion_V1_SNAPSHOT:
+        return MetadataVersion::V1_SNAPSHOT;
+      // Add cases as other versions become available
+      default:
+        return MetadataVersion::V1_SNAPSHOT;
+    }
+  }
+
+  FileBlock record_batch(int i) const {
+    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+  }
+
+  FileBlock dictionary(int i) const {
+    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+  }
+
+  Status GetSchema(std::shared_ptr<Schema>* out) const {
+    auto schema_msg = std::make_shared<SchemaMessage>(nullptr, 
footer_->schema());
+    return schema_msg->GetSchema(out);
+  }
+
+ private:
+  // Retain reference to memory
+  std::shared_ptr<Buffer> buffer_;
+
+  const flatbuf::Footer* footer_;
+};
+
+FileFooter::FileFooter() {}
+
+FileFooter::~FileFooter() {}
+
+Status FileFooter::Open(
+    const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
+  const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
+
+  *out = std::unique_ptr<FileFooter>(new FileFooter());
+
+  // TODO(wesm): Verify the footer
+  (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
+
+  return Status::OK();
+}
+
+int FileFooter::num_dictionaries() const {
+  return impl_->num_dictionaries();
+}
+
+int FileFooter::num_record_batches() const {
+  return impl_->num_record_batches();
+}
+
+MetadataVersion::type FileFooter::version() const {
+  return impl_->version();
+}
+
+FileBlock FileFooter::record_batch(int i) const {
+  return impl_->record_batch(i);
+}
+
+FileBlock FileFooter::dictionary(int i) const {
+  return impl_->dictionary(i);
+}
+
+Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
+  return impl_->GetSchema(out);
+}
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index d5ec533..2f0e853 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -22,6 +22,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <vector>
 
 #include "arrow/util/visibility.h"
 
@@ -32,17 +33,24 @@ struct Field;
 class Schema;
 class Status;
 
+namespace io {
+
+class OutputStream;
+
+}  // namespace io
+
 namespace ipc {
 
+struct MetadataVersion {
+  enum type { V1_SNAPSHOT };
+};
+
 //----------------------------------------------------------------------
-// Message read/write APIs
 
 // Serialize arrow::Schema as a Flatbuffer
 ARROW_EXPORT
 Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
 
-//----------------------------------------------------------------------
-
 // Read interface classes. We do not fully deserialize the flatbuffers so that
 // individual fields metadata can be retrieved from very large schema without
 //
@@ -68,8 +76,8 @@ class ARROW_EXPORT SchemaMessage {
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
 
-  class Impl;
-  std::unique_ptr<Impl> impl_;
+  class SchemaMessageImpl;
+  std::unique_ptr<SchemaMessageImpl> impl_;
 };
 
 // Field metadata
@@ -101,8 +109,8 @@ class ARROW_EXPORT RecordBatchMessage {
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
 
-  class Impl;
-  std::unique_ptr<Impl> impl_;
+  class RecordBatchMessageImpl;
+  std::unique_ptr<RecordBatchMessageImpl> impl_;
 };
 
 class ARROW_EXPORT DictionaryBatchMessage {
@@ -133,8 +141,46 @@ class ARROW_EXPORT Message : public 
std::enable_shared_from_this<Message> {
   Message();
 
   // Hide serialization details from user API
-  class Impl;
-  std::unique_ptr<Impl> impl_;
+  class MessageImpl;
+  std::unique_ptr<MessageImpl> impl_;
+};
+
+// ----------------------------------------------------------------------
+// File footer for file-like representation
+
+struct FileBlock {
+  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
+      : offset(offset), metadata_length(metadata_length), 
body_length(body_length) {}
+
+  int64_t offset;
+  int32_t metadata_length;
+  int64_t body_length;
+};
+
+ARROW_EXPORT
+Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& 
dictionaries,
+    const std::vector<FileBlock>& record_batches, io::OutputStream* out);
+
+class ARROW_EXPORT FileFooter {
+ public:
+  ~FileFooter();
+
+  static Status Open(
+      const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
+
+  int num_dictionaries() const;
+  int num_record_batches() const;
+  MetadataVersion::type version() const;
+
+  FileBlock record_batch(int i) const;
+  FileBlock dictionary(int i) const;
+
+  Status GetSchema(std::shared_ptr<Schema>* out) const;
+
+ private:
+  FileFooter();
+  class FileFooterImpl;
+  std::unique_ptr<FileFooterImpl> impl_;
 };
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index f6582fc..7d02bc3 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -25,21 +25,28 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/types/list.h"
 #include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/types/struct.h"
 #include "arrow/util/buffer.h"
 #include "arrow/util/memory-pool.h"
 
 namespace arrow {
 namespace ipc {
 
+const auto kInt32 = std::make_shared<Int32Type>();
+const auto kListInt32 = std::make_shared<ListType>(kInt32);
+const auto kListListInt32 = std::make_shared<ListType>(kListInt32);
+
 Status MakeRandomInt32Array(
     int32_t length, bool include_nulls, MemoryPool* pool, 
std::shared_ptr<Array>* array) {
   std::shared_ptr<PoolBuffer> data;
   test::MakeRandomInt32PoolBuffer(length, pool, &data);
-  const auto INT32 = std::make_shared<Int32Type>();
-  Int32Builder builder(pool, INT32);
+  const auto kInt32 = std::make_shared<Int32Type>();
+  Int32Builder builder(pool, kInt32);
   if (include_nulls) {
     std::shared_ptr<PoolBuffer> valid_bytes;
     test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes);
@@ -87,6 +94,188 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& 
child_array, int num_li
   return (*array)->Validate();
 }
 
+typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out);
+
+Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  const int length = 1000;
+
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", kInt32);
+  auto f1 = std::make_shared<Field>("f1", kInt32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
+
+  // Example data
+  std::shared_ptr<Array> a0, a1;
+  MemoryPool* pool = default_memory_pool();
+  RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
+  out->reset(new RecordBatch(schema, length, {a0, a1}));
+  return Status::OK();
+}
+
+template <class Builder, class RawType>
+Status MakeRandomBinaryArray(
+    const TypePtr& type, int32_t length, MemoryPool* pool, ArrayPtr* array) {
+  const std::vector<std::string> values = {
+      "", "", "abc", "123", "efg", "456!@#!@#", "12312"};
+  Builder builder(pool, type);
+  const auto values_len = values.size();
+  for (int32_t i = 0; i < length; ++i) {
+    int values_index = i % values_len;
+    if (values_index == 0) {
+      RETURN_NOT_OK(builder.AppendNull());
+    } else {
+      const std::string& value = values[values_index];
+      RETURN_NOT_OK(
+          builder.Append(reinterpret_cast<const RawType*>(value.data()), 
value.size()));
+    }
+  }
+  *array = builder.Finish();
+  return Status::OK();
+}
+
+Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  const int32_t length = 500;
+  auto string_type = std::make_shared<StringType>();
+  auto binary_type = std::make_shared<BinaryType>();
+  auto f0 = std::make_shared<Field>("f0", string_type);
+  auto f1 = std::make_shared<Field>("f1", binary_type);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
+
+  std::shared_ptr<Array> a0, a1;
+  MemoryPool* pool = default_memory_pool();
+
+  {
+    auto status =
+        MakeRandomBinaryArray<StringBuilder, char>(string_type, length, pool, 
&a0);
+    RETURN_NOT_OK(status);
+  }
+  {
+    auto status =
+        MakeRandomBinaryArray<BinaryBuilder, uint8_t>(binary_type, length, 
pool, &a1);
+    RETURN_NOT_OK(status);
+  }
+  out->reset(new RecordBatch(schema, length, {a0, a1}));
+  return Status::OK();
+}
+
+Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", kListInt32);
+  auto f1 = std::make_shared<Field>("f1", kListListInt32);
+  auto f2 = std::make_shared<Field>("f2", kInt32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
+
+  // Example data
+
+  MemoryPool* pool = default_memory_pool();
+  const int length = 200;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+  const bool include_nulls = true;
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
+  RETURN_NOT_OK(
+      MakeRandomListArray(leaf_values, length, include_nulls, pool, 
&list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
+  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, 
flat_array}));
+  return Status::OK();
+}
+
+Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", kListInt32);
+  auto f1 = std::make_shared<Field>("f1", kListListInt32);
+  auto f2 = std::make_shared<Field>("f2", kInt32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
+
+  // Example data
+  MemoryPool* pool = default_memory_pool();
+  const int length = 200;
+  const bool include_nulls = true;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
+  RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, 
&list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, 0, include_nulls, pool, 
&list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
+  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, 
flat_array}));
+  return Status::OK();
+}
+
+Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", kListInt32);
+  auto f1 = std::make_shared<Field>("f1", kListListInt32);
+  auto f2 = std::make_shared<Field>("f2", kInt32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
+
+  // Example data
+  MemoryPool* pool = default_memory_pool();
+  const int length = 50;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values));
+  bool include_nulls = false;
+  RETURN_NOT_OK(
+      MakeRandomListArray(leaf_values, length, include_nulls, pool, 
&list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
+  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, 
flat_array}));
+  return Status::OK();
+}
+
+Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
+  const int batch_length = 5;
+  TypePtr type = kInt32;
+
+  MemoryPool* pool = default_memory_pool();
+  ArrayPtr array;
+  const bool include_nulls = true;
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array));
+  for (int i = 0; i < 63; ++i) {
+    type = 
std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+    RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, 
pool, &array));
+  }
+
+  auto f0 = std::make_shared<Field>("f0", type);
+  std::shared_ptr<Schema> schema(new Schema({f0}));
+  std::vector<ArrayPtr> arrays = {array};
+  out->reset(new RecordBatch(schema, batch_length, arrays));
+  return Status::OK();
+}
+
+Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
+  // reuse constructed list columns
+  std::shared_ptr<RecordBatch> list_batch;
+  RETURN_NOT_OK(MakeListRecordBatch(&list_batch));
+  std::vector<ArrayPtr> columns = {
+      list_batch->column(0), list_batch->column(1), list_batch->column(2)};
+  auto list_schema = list_batch->schema();
+
+  // Define schema
+  std::shared_ptr<DataType> type(new StructType(
+      {list_schema->field(0), list_schema->field(1), list_schema->field(2)}));
+  auto f0 = std::make_shared<Field>("non_null_struct", type);
+  auto f1 = std::make_shared<Field>("null_struct", type);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
+
+  // construct individual nullable/non-nullable struct arrays
+  ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns));
+  std::vector<uint8_t> null_bytes(list_batch->num_rows(), 1);
+  null_bytes[0] = 0;
+  std::shared_ptr<Buffer> null_bitmask;
+  RETURN_NOT_OK(util::bytes_to_bits(null_bytes, &null_bitmask));
+  ArrayPtr with_nulls(
+      new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask));
+
+  // construct batch
+  std::vector<ArrayPtr> arrays = {no_nulls, with_nulls};
+  out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays));
+  return Status::OK();
+}
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 3f4001b..94079a3 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -27,6 +27,14 @@
 namespace arrow {
 namespace ipc {
 
+// Align on 8-byte boundaries
+static constexpr int kArrowAlignment = 8;
+static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
+
+static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = 
kArrowAlignment) {
+  return ((nbytes + alignment - 1) / alignment) * alignment;
+}
+
 // A helper class to tracks the size of allocations
 class MockOutputStream : public io::OutputStream {
  public:

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/parquet/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h
index a9c64ec..2689beb 100644
--- a/cpp/src/arrow/parquet/reader.h
+++ b/cpp/src/arrow/parquet/reader.h
@@ -31,7 +31,7 @@ namespace arrow {
 
 class Array;
 class MemoryPool;
-class RowBatch;
+class RecordBatch;
 class Status;
 class Table;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 5aa1ba5..ecc6a9f 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -30,7 +30,7 @@ namespace arrow {
 class Array;
 class MemoryPool;
 class PrimitiveArray;
-class RowBatch;
+class RecordBatch;
 class Status;
 class StringArray;
 class Table;

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index d9573ea..3a250df 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -27,11 +27,11 @@
 
 namespace arrow {
 
-RowBatch::RowBatch(const std::shared_ptr<Schema>& schema, int num_rows,
+RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int num_rows,
     const std::vector<std::shared_ptr<Array>>& columns)
     : schema_(schema), num_rows_(num_rows), columns_(columns) {}
 
-const std::string& RowBatch::column_name(int i) const {
+const std::string& RecordBatch::column_name(int i) const {
   return schema_->field(i)->name;
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7e39747e/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 2088fdf..36b3c8e 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -32,15 +32,15 @@ class Column;
 class Schema;
 class Status;
 
-// A row batch is a simpler and more rigid table data structure intended for
+// A record batch is a simpler and more rigid table data structure intended for
 // use primarily in shared memory IPC. It contains a schema (metadata) and a
-// corresponding vector of equal-length Arrow arrays
-class ARROW_EXPORT RowBatch {
+// corresponding sequence of equal-length Arrow arrays
+class ARROW_EXPORT RecordBatch {
  public:
-  // num_rows is a parameter to allow for row batches of a particular size not
+  // num_rows is a parameter to allow for record batches of a particular size 
not
   // having any materialized columns. Each array should have the same length as
   // num_rows
-  RowBatch(const std::shared_ptr<Schema>& schema, int num_rows,
+  RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows,
       const std::vector<std::shared_ptr<Array>>& columns);
 
   // @returns: the table's schema
@@ -50,17 +50,19 @@ class ARROW_EXPORT RowBatch {
   // Note: Does not boundscheck
   const std::shared_ptr<Array>& column(int i) const { return columns_[i]; }
 
+  const std::vector<std::shared_ptr<Array>>& columns() const { return 
columns_; }
+
   const std::string& column_name(int i) const;
 
   // @returns: the number of columns in the table
   int num_columns() const { return columns_.size(); }
 
   // @returns: the number of rows (the corresponding length of each column)
-  int64_t num_rows() const { return num_rows_; }
+  int32_t num_rows() const { return num_rows_; }
 
  private:
   std::shared_ptr<Schema> schema_;
-  int num_rows_;
+  int32_t num_rows_;
   std::vector<std::shared_ptr<Array>> columns_;
 };
 

Reply via email to