Repository: arrow
Updated Branches:
  refs/heads/master ef3b6b344 -> 16c97592b


ARROW-577: [C++] Use private implementation pattern in ipc::StreamWriter and 
ipc::FileWriter

This patch also includes some code reorganization -- I moved the reader and 
writer classes to their own headers/compilation units. I also moved the 
stream-to-file and file-to-stream executables to arrow/ipc

Author: Wes McKinney <[email protected]>

Closes #351 from wesm/ARROW-577 and squashes the following commits:

98c32d2 [Wes McKinney] Only build file/stream utils if ARROW_BUILD_UTILITIES is 
on
c5fa43f [Wes McKinney] Refactor to give make stream and file writer 
implementation details private in public ABI


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/16c97592
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/16c97592
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/16c97592

Branch: refs/heads/master
Commit: 16c97592bf948c32a8dae9441ace078422d642dd
Parents: ef3b6b3
Author: Wes McKinney <[email protected]>
Authored: Sun Feb 26 19:22:15 2017 -0500
Committer: Wes McKinney <[email protected]>
Committed: Sun Feb 26 19:22:15 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/ipc/CMakeLists.txt           |  32 +-
 cpp/src/arrow/ipc/api.h                    |  27 ++
 cpp/src/arrow/ipc/file-to-stream.cc        |  60 ++++
 cpp/src/arrow/ipc/file.cc                  | 297 -------------------
 cpp/src/arrow/ipc/file.h                   | 111 -------
 cpp/src/arrow/ipc/ipc-file-test.cc         |   4 +-
 cpp/src/arrow/ipc/json-integration-test.cc |   3 +-
 cpp/src/arrow/ipc/metadata.h               |  12 +
 cpp/src/arrow/ipc/reader.cc                | 369 ++++++++++++++++++++++++
 cpp/src/arrow/ipc/reader.h                 | 112 +++++++
 cpp/src/arrow/ipc/stream-to-file.cc        |  58 ++++
 cpp/src/arrow/ipc/stream.cc                | 310 --------------------
 cpp/src/arrow/ipc/stream.h                 | 137 ---------
 cpp/src/arrow/ipc/writer.cc                | 287 ++++++++++++++++++
 cpp/src/arrow/ipc/writer.h                 |  92 ++++++
 cpp/src/arrow/util/CMakeLists.txt          |  22 --
 cpp/src/arrow/util/file-to-stream.cc       |  60 ----
 cpp/src/arrow/util/stream-to-file.cc       |  58 ----
 python/pyarrow/includes/libarrow_ipc.pxd   |   5 +-
 19 files changed, 1050 insertions(+), 1006 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index e7a3fdb..08da0a1 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -30,12 +30,12 @@ set(ARROW_IPC_TEST_LINK_LIBS
 
 set(ARROW_IPC_SRCS
   adapter.cc
-  file.cc
   json.cc
   json-internal.cc
   metadata.cc
   metadata-internal.cc
-  stream.cc
+  reader.cc
+  writer.cc
 )
 
 if(NOT APPLE)
@@ -138,10 +138,11 @@ add_dependencies(arrow_ipc_objlib metadata_fbs)
 # Headers: top level
 install(FILES
   adapter.h
-  file.h
+  api.h
   json.h
   metadata.h
-  stream.h
+  reader.h
+  writer.h
   DESTINATION include/arrow/ipc)
 
 # pkg-config support
@@ -151,3 +152,26 @@ configure_file(arrow-ipc.pc.in
 install(
   FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-ipc.pc"
   DESTINATION "lib/pkgconfig/")
+
+
+set(UTIL_LINK_LIBS
+  arrow_ipc_static
+  arrow_io_static
+  arrow_static
+  boost_filesystem_static
+  boost_system_static
+  dl)
+
+if (NOT APPLE)
+  set(UTIL_LINK_LIBS
+    ${UTIL_LINK_LIBS}
+    boost_filesystem_static
+    boost_system_static)
+endif()
+
+if (ARROW_BUILD_UTILITIES)
+  add_executable(file-to-stream file-to-stream.cc)
+  target_link_libraries(file-to-stream ${UTIL_LINK_LIBS})
+  add_executable(stream-to-file stream-to-file.cc)
+  target_link_libraries(stream-to-file ${UTIL_LINK_LIBS})
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/api.h b/cpp/src/arrow/ipc/api.h
new file mode 100644
index 0000000..cb85421
--- /dev/null
+++ b/cpp/src/arrow/ipc/api.h
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IPC_API_H
+#define ARROW_IPC_API_H
+
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/json.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+
+#endif  // ARROW_IPC_API_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file-to-stream.cc 
b/cpp/src/arrow/ipc/file-to-stream.cc
new file mode 100644
index 0000000..8161b19
--- /dev/null
+++ b/cpp/src/arrow/ipc/file-to-stream.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/file.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/status.h"
+#include <iostream>
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Reads a file on the file system and prints to stdout the stream version of 
it.
+Status ConvertToStream(const char* path) {
+  std::shared_ptr<io::ReadableFile> in_file;
+  std::shared_ptr<ipc::FileReader> reader;
+
+  RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
+  RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+
+  io::StdoutStream sink;
+  std::shared_ptr<ipc::StreamWriter> writer;
+  RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
+  for (int i = 0; i < reader->num_record_batches(); ++i) {
+    std::shared_ptr<RecordBatch> chunk;
+    RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+    RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
+  }
+  return writer->Close();
+}
+
+}  // namespace arrow
+
+int main(int argc, char** argv) {
+  if (argc != 2) {
+    std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
+    return 1;
+  }
+  arrow::Status status = arrow::ConvertToStream(argv[1]);
+  if (!status.ok()) {
+    std::cerr << "Could not convert to stream: " << status.ToString() << 
std::endl;
+    return 1;
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
deleted file mode 100644
index c1d483f..0000000
--- a/cpp/src/arrow/ipc/file.cc
+++ /dev/null
@@ -1,297 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/file.h"
-
-#include <cstdint>
-#include <cstring>
-#include <sstream>
-#include <vector>
-
-#include "arrow/buffer.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/util.h"
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
-
-namespace arrow {
-namespace ipc {
-
-static constexpr const char* kArrowMagicBytes = "ARROW1";
-
-static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
-FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
-  std::vector<flatbuf::Block> fb_blocks;
-
-  for (const FileBlock& block : blocks) {
-    fb_blocks.emplace_back(block.offset, block.metadata_length, 
block.body_length);
-  }
-
-  return fbb.CreateVectorOfStructs(fb_blocks);
-}
-
-Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
-    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
-    io::OutputStream* out) {
-  FBB fbb;
-
-  flatbuffers::Offset<flatbuf::Schema> fb_schema;
-  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
-
-  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
-  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
-
-  auto footer = flatbuf::CreateFooter(
-      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
-
-  fbb.Finish(footer);
-
-  int32_t size = fbb.GetSize();
-
-  return out->Write(fbb.GetBufferPointer(), size);
-}
-
-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-  return FileBlock(block->offset(), block->metaDataLength(), 
block->bodyLength());
-}
-
-// ----------------------------------------------------------------------
-// File writer implementation
-
-FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema)
-    : StreamWriter(sink, schema) {}
-
-Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
-    std::shared_ptr<FileWriter>* out) {
-  *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema));  // ctor 
is private
-  RETURN_NOT_OK((*out)->UpdatePosition());
-  return Status::OK();
-}
-
-Status FileWriter::Start() {
-  RETURN_NOT_OK(WriteAligned(
-      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes)));
-
-  // We write the schema at the start of the file (and the end). This also
-  // writes all the dictionaries at the beginning of the file
-  return StreamWriter::Start();
-}
-
-Status FileWriter::Close() {
-  // Write metadata
-  int64_t initial_position = position_;
-  RETURN_NOT_OK(WriteFileFooter(
-      *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), 
sink_));
-  RETURN_NOT_OK(UpdatePosition());
-
-  // Write footer length
-  int32_t footer_length = position_ - initial_position;
-
-  if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); }
-
-  RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(&footer_length), 
sizeof(int32_t)));
-
-  // Write magic bytes to end file
-  return Write(
-      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes));
-}
-
-// ----------------------------------------------------------------------
-// Reader implementation
-
-class FileReader::FileReaderImpl {
- public:
-  FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
-
-  Status ReadFooter() {
-    int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
-
-    if (footer_offset_ <= magic_size * 2 + 4) {
-      std::stringstream ss;
-      ss << "File is too small: " << footer_offset_;
-      return Status::Invalid(ss.str());
-    }
-
-    std::shared_ptr<Buffer> buffer;
-    int file_end_size = magic_size + sizeof(int32_t);
-    RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, 
&buffer));
-
-    if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, 
magic_size)) {
-      return Status::Invalid("Not an Arrow file");
-    }
-
-    int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
-
-    if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > 
footer_offset_) {
-      return Status::Invalid("File is smaller than indicated metadata size");
-    }
-
-    // Now read the footer
-    RETURN_NOT_OK(file_->ReadAt(
-        footer_offset_ - footer_length - file_end_size, footer_length, 
&footer_buffer_));
-
-    // TODO(wesm): Verify the footer
-    footer_ = flatbuf::GetFooter(footer_buffer_->data());
-    schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
-
-    return Status::OK();
-  }
-
-  int num_dictionaries() const { return footer_->dictionaries()->size(); }
-
-  int num_record_batches() const { return footer_->recordBatches()->size(); }
-
-  MetadataVersion::type version() const {
-    switch (footer_->version()) {
-      case flatbuf::MetadataVersion_V1:
-        return MetadataVersion::V1;
-      case flatbuf::MetadataVersion_V2:
-        return MetadataVersion::V2;
-      // Add cases as other versions become available
-      default:
-        return MetadataVersion::V2;
-    }
-  }
-
-  FileBlock record_batch(int i) const {
-    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-  }
-
-  FileBlock dictionary(int i) const {
-    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-  }
-
-  const SchemaMetadata& schema_metadata() const { return *schema_metadata_; }
-
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
-    DCHECK_GE(i, 0);
-    DCHECK_LT(i, num_record_batches());
-    FileBlock block = record_batch(i);
-
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(
-        ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
-    auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
-    // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
-    // ARROW-384).
-    std::shared_ptr<Buffer> buffer_block;
-    RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
-    io::BufferReader reader(buffer_block);
-
-    return ReadRecordBatch(*metadata, schema_, &reader, batch);
-  }
-
-  Status ReadSchema() {
-    RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_));
-
-    // Read all the dictionaries
-    for (int i = 0; i < num_dictionaries(); ++i) {
-      FileBlock block = dictionary(i);
-      std::shared_ptr<Message> message;
-      RETURN_NOT_OK(
-          ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
-
-      // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a 
more
-      // invasive refactor
-      DictionaryBatchMetadata metadata(message);
-
-      // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
-      // ARROW-384).
-      std::shared_ptr<Buffer> buffer_block;
-      RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
-      io::BufferReader reader(buffer_block);
-
-      std::shared_ptr<Array> dictionary;
-      RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, 
&dictionary));
-      RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), 
dictionary));
-    }
-
-    // Get the schema
-    return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
-  }
-
-  Status Open(
-      const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset) {
-    file_ = file;
-    footer_offset_ = footer_offset;
-    RETURN_NOT_OK(ReadFooter());
-    return ReadSchema();
-  }
-
-  std::shared_ptr<Schema> schema() const { return schema_; }
-
- private:
-  std::shared_ptr<io::ReadableFileInterface> file_;
-
-  // The location where the Arrow file layout ends. May be the end of the file
-  // or some other location if embedded in a larger file.
-  int64_t footer_offset_;
-
-  // Footer metadata
-  std::shared_ptr<Buffer> footer_buffer_;
-  const flatbuf::Footer* footer_;
-  std::unique_ptr<SchemaMetadata> schema_metadata_;
-
-  DictionaryTypeMap dictionary_fields_;
-  std::shared_ptr<DictionaryMemo> dictionary_memo_;
-
-  // Reconstructed schema, including any read dictionaries
-  std::shared_ptr<Schema> schema_;
-};
-
-FileReader::FileReader() {
-  impl_.reset(new FileReaderImpl());
-}
-
-FileReader::~FileReader() {}
-
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-    std::shared_ptr<FileReader>* reader) {
-  int64_t footer_offset;
-  RETURN_NOT_OK(file->GetSize(&footer_offset));
-  return Open(file, footer_offset, reader);
-}
-
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
-  *reader = std::shared_ptr<FileReader>(new FileReader());
-  return (*reader)->impl_->Open(file, footer_offset);
-}
-
-std::shared_ptr<Schema> FileReader::schema() const {
-  return impl_->schema();
-}
-
-int FileReader::num_record_batches() const {
-  return impl_->num_record_batches();
-}
-
-MetadataVersion::type FileReader::version() const {
-  return impl_->version();
-}
-
-Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
-  return impl_->GetRecordBatch(i, batch);
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
deleted file mode 100644
index 524766c..0000000
--- a/cpp/src/arrow/ipc/file.h
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Implement Arrow file layout for IPC/RPC purposes and short-lived storage
-
-#ifndef ARROW_IPC_FILE_H
-#define ARROW_IPC_FILE_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/stream.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Buffer;
-class RecordBatch;
-class Schema;
-class Status;
-
-namespace io {
-
-class OutputStream;
-class ReadableFileInterface;
-
-}  // namespace io
-
-namespace ipc {
-
-Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
-    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
-    io::OutputStream* out);
-
-class ARROW_EXPORT FileWriter : public StreamWriter {
- public:
-  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
-      std::shared_ptr<FileWriter>* out);
-
-  using StreamWriter::WriteRecordBatch;
-  Status Close() override;
-
- private:
-  FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
-
-  Status Start() override;
-};
-
-class ARROW_EXPORT FileReader {
- public:
-  ~FileReader();
-
-  // Open a file-like object that is assumed to be self-contained; i.e., the
-  // end of the file interface is the end of the Arrow file. Note that there
-  // can be any amount of data preceding the Arrow-formatted data, because we
-  // need only locate the end of the Arrow file stream to discover the metadata
-  // and then proceed to read the data into memory.
-  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-      std::shared_ptr<FileReader>* reader);
-
-  // If the file is embedded within some larger file or memory region, you can
-  // pass the absolute memory offset to the end of the file (which contains the
-  // metadata footer). The metadata must have been written with memory offsets
-  // relative to the start of the containing file
-  //
-  // @param file: the data source
-  // @param footer_offset: the position of the end of the Arrow "file"
-  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-      int64_t footer_offset, std::shared_ptr<FileReader>* reader);
-
-  /// The schema includes any dictionaries
-  std::shared_ptr<Schema> schema() const;
-
-  int num_record_batches() const;
-
-  MetadataVersion::type version() const;
-
-  // Read a record batch from the file. Does not copy memory if the input
-  // source supports zero-copy.
-  //
-  // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an
-  // "always copy" option)
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
-
- private:
-  FileReader();
-
-  class ARROW_NO_EXPORT FileReaderImpl;
-  std::unique_ptr<FileReaderImpl> impl_;
-};
-
-}  // namespace ipc
-}  // namespace arrow
-
-#endif  // ARROW_IPC_FILE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc 
b/cpp/src/arrow/ipc/ipc-file-test.cc
index 4b82aab..e58f2cf 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -28,10 +28,10 @@
 #include "arrow/io/memory.h"
 #include "arrow/io/test-common.h"
 #include "arrow/ipc/adapter.h"
-#include "arrow/ipc/file.h"
-#include "arrow/ipc/stream.h"
+#include "arrow/ipc/reader.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/ipc/util.h"
+#include "arrow/ipc/writer.h"
 
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc 
b/cpp/src/arrow/ipc/json-integration-test.cc
index 95bc742..c16074e 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -29,8 +29,9 @@
 #include <boost/filesystem.hpp>  // NOLINT
 
 #include "arrow/io/file.h"
-#include "arrow/ipc/file.h"
 #include "arrow/ipc/json.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
 #include "arrow/pretty_print.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 0091067..f12529b 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -50,6 +50,18 @@ struct MetadataVersion {
   enum type { V1, V2 };
 };
 
+static constexpr const char* kArrowMagicBytes = "ARROW1";
+
+struct ARROW_EXPORT FileBlock {
+  FileBlock() {}
+  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
+      : offset(offset), metadata_length(metadata_length), 
body_length(body_length) {}
+
+  int64_t offset;
+  int32_t metadata_length;
+  int64_t body_length;
+};
+
 //----------------------------------------------------------------------
 
 using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
new file mode 100644
index 0000000..1a9af7d
--- /dev/null
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/reader.h"
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace ipc {
+
+// ----------------------------------------------------------------------
+// StreamReader implementation
+
+static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
+  return FileBlock(block->offset(), block->metaDataLength(), 
block->bodyLength());
+}
+
+static inline std::string message_type_name(Message::Type type) {
+  switch (type) {
+    case Message::SCHEMA:
+      return "schema";
+    case Message::RECORD_BATCH:
+      return "record batch";
+    case Message::DICTIONARY_BATCH:
+      return "dictionary";
+    default:
+      break;
+  }
+  return "unknown";
+}
+
+class StreamReader::StreamReaderImpl {
+ public:
+  StreamReaderImpl() {}
+  ~StreamReaderImpl() {}
+
+  Status Open(const std::shared_ptr<io::InputStream>& stream) {
+    stream_ = stream;
+    return ReadSchema();
+  }
+
+  Status ReadNextMessage(Message::Type expected_type, 
std::shared_ptr<Message>* message) {
+    std::shared_ptr<Buffer> buffer;
+    RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
+
+    if (buffer->size() != sizeof(int32_t)) {
+      *message = nullptr;
+      return Status::OK();
+    }
+
+    int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+    RETURN_NOT_OK(stream_->Read(message_length, &buffer));
+    if (buffer->size() != message_length) {
+      return Status::IOError("Unexpected end of stream trying to read 
message");
+    }
+
+    RETURN_NOT_OK(Message::Open(buffer, 0, message));
+
+    if ((*message)->type() != expected_type) {
+      std::stringstream ss;
+      ss << "Message not expected type: " << message_type_name(expected_type)
+         << ", was: " << (*message)->type();
+      return Status::IOError(ss.str());
+    }
+    return Status::OK();
+  }
+
+  Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) {
+    RETURN_NOT_OK(stream_->Read(size, buffer));
+
+    if ((*buffer)->size() < size) {
+      return Status::IOError("Unexpected EOS when reading buffer");
+    }
+    return Status::OK();
+  }
+
+  Status ReadNextDictionary() {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message));
+
+    DictionaryBatchMetadata metadata(message);
+
+    std::shared_ptr<Buffer> batch_body;
+    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body))
+    io::BufferReader reader(batch_body);
+
+    std::shared_ptr<Array> dictionary;
+    RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, 
&dictionary));
+    return dictionary_memo_.AddDictionary(metadata.id(), dictionary);
+  }
+
+  Status ReadSchema() {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message));
+
+    SchemaMetadata schema_meta(message);
+    RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_));
+
+    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
+    // those found in the schema
+    int num_dictionaries = static_cast<int>(dictionary_types_.size());
+    for (int i = 0; i < num_dictionaries; ++i) {
+      RETURN_NOT_OK(ReadNextDictionary());
+    }
+
+    return schema_meta.GetSchema(dictionary_memo_, &schema_);
+  }
+
+  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message));
+
+    if (message == nullptr) {
+      // End of stream
+      *batch = nullptr;
+      return Status::OK();
+    }
+
+    RecordBatchMetadata batch_metadata(message);
+
+    std::shared_ptr<Buffer> batch_body;
+    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
+    io::BufferReader reader(batch_body);
+    return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
+  }
+
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  // dictionary_id -> type
+  DictionaryTypeMap dictionary_types_;
+
+  DictionaryMemo dictionary_memo_;
+
+  std::shared_ptr<io::InputStream> stream_;
+  std::shared_ptr<Schema> schema_;
+};
+
+StreamReader::StreamReader() {
+  impl_.reset(new StreamReaderImpl());
+}
+
+StreamReader::~StreamReader() {}
+
+Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+    std::shared_ptr<StreamReader>* reader) {
+  // Private ctor
+  *reader = std::shared_ptr<StreamReader>(new StreamReader());
+  return (*reader)->impl_->Open(stream);
+}
+
+std::shared_ptr<Schema> StreamReader::schema() const {
+  return impl_->schema();
+}
+
+Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+  return impl_->GetNextRecordBatch(batch);
+}
+
+// ----------------------------------------------------------------------
+// Reader implementation
+
+class FileReader::FileReaderImpl {
+ public:
+  FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
+
+  Status ReadFooter() {
+    int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
+
+    if (footer_offset_ <= magic_size * 2 + 4) {
+      std::stringstream ss;
+      ss << "File is too small: " << footer_offset_;
+      return Status::Invalid(ss.str());
+    }
+
+    std::shared_ptr<Buffer> buffer;
+    int file_end_size = magic_size + sizeof(int32_t);
+    RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, 
&buffer));
+
+    if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, 
magic_size)) {
+      return Status::Invalid("Not an Arrow file");
+    }
+
+    int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+    if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > 
footer_offset_) {
+      return Status::Invalid("File is smaller than indicated metadata size");
+    }
+
+    // Now read the footer
+    RETURN_NOT_OK(file_->ReadAt(
+        footer_offset_ - footer_length - file_end_size, footer_length, 
&footer_buffer_));
+
+    // TODO(wesm): Verify the footer
+    footer_ = flatbuf::GetFooter(footer_buffer_->data());
+    schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
+
+    return Status::OK();
+  }
+
+  int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+  int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+  MetadataVersion::type version() const {
+    switch (footer_->version()) {
+      case flatbuf::MetadataVersion_V1:
+        return MetadataVersion::V1;
+      case flatbuf::MetadataVersion_V2:
+        return MetadataVersion::V2;
+      // Add cases as other versions become available
+      default:
+        return MetadataVersion::V2;
+    }
+  }
+
+  FileBlock record_batch(int i) const {
+    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+  }
+
+  FileBlock dictionary(int i) const {
+    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+  }
+
+  const SchemaMetadata& schema_metadata() const { return *schema_metadata_; }
+
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+    DCHECK_GE(i, 0);
+    DCHECK_LT(i, num_record_batches());
+    FileBlock block = record_batch(i);
+
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(
+        ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+    // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+    // ARROW-384).
+    std::shared_ptr<Buffer> buffer_block;
+    RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+    io::BufferReader reader(buffer_block);
+
+    return ReadRecordBatch(*metadata, schema_, &reader, batch);
+  }
+
+  Status ReadSchema() {
+    RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_));
+
+    // Read all the dictionaries
+    for (int i = 0; i < num_dictionaries(); ++i) {
+      FileBlock block = dictionary(i);
+      std::shared_ptr<Message> message;
+      RETURN_NOT_OK(
+          ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
+
+      // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a 
more
+      // invasive refactor
+      DictionaryBatchMetadata metadata(message);
+
+      // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+      // ARROW-384).
+      std::shared_ptr<Buffer> buffer_block;
+      RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+      io::BufferReader reader(buffer_block);
+
+      std::shared_ptr<Array> dictionary;
+      RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, 
&dictionary));
+      RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), 
dictionary));
+    }
+
+    // Get the schema
+    return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
+  }
+
+  Status Open(
+      const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset) {
+    file_ = file;
+    footer_offset_ = footer_offset;
+    RETURN_NOT_OK(ReadFooter());
+    return ReadSchema();
+  }
+
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  std::shared_ptr<io::ReadableFileInterface> file_;
+
+  // The location where the Arrow file layout ends. May be the end of the file
+  // or some other location if embedded in a larger file.
+  int64_t footer_offset_;
+
+  // Footer metadata
+  std::shared_ptr<Buffer> footer_buffer_;
+  const flatbuf::Footer* footer_;
+  std::unique_ptr<SchemaMetadata> schema_metadata_;
+
+  DictionaryTypeMap dictionary_fields_;
+  std::shared_ptr<DictionaryMemo> dictionary_memo_;
+
+  // Reconstructed schema, including any read dictionaries
+  std::shared_ptr<Schema> schema_;
+};
+
+FileReader::FileReader() {
+  impl_.reset(new FileReaderImpl());
+}
+
+FileReader::~FileReader() {}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    std::shared_ptr<FileReader>* reader) {
+  int64_t footer_offset;
+  RETURN_NOT_OK(file->GetSize(&footer_offset));
+  return Open(file, footer_offset, reader);
+}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
+  *reader = std::shared_ptr<FileReader>(new FileReader());
+  return (*reader)->impl_->Open(file, footer_offset);
+}
+
+std::shared_ptr<Schema> FileReader::schema() const {
+  return impl_->schema();
+}
+
+int FileReader::num_record_batches() const {
+  return impl_->num_record_batches();
+}
+
+MetadataVersion::type FileReader::version() const {
+  return impl_->version();
+}
+
+Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+  return impl_->GetRecordBatch(i, batch);
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
new file mode 100644
index 0000000..6f143e1
--- /dev/null
+++ b/cpp/src/arrow/ipc/reader.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow file layout for IPC/RPC purposes and short-lived storage
+
+#ifndef ARROW_IPC_FILE_H
+#define ARROW_IPC_FILE_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/ipc/metadata.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class RecordBatch;
+class Schema;
+class Status;
+
+namespace io {
+
+class InputStream;
+class ReadableFileInterface;
+
+}  // namespace io
+
+namespace ipc {
+
+class ARROW_EXPORT StreamReader {
+ public:
+  ~StreamReader();
+
+  // Open an stream.
+  static Status Open(const std::shared_ptr<io::InputStream>& stream,
+      std::shared_ptr<StreamReader>* reader);
+
+  std::shared_ptr<Schema> schema() const;
+
+  // Returned batch is nullptr when end of stream reached
+  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+
+ private:
+  StreamReader();
+
+  class ARROW_NO_EXPORT StreamReaderImpl;
+  std::unique_ptr<StreamReaderImpl> impl_;
+};
+
+class ARROW_EXPORT FileReader {
+ public:
+  ~FileReader();
+
+  // Open a file-like object that is assumed to be self-contained; i.e., the
+  // end of the file interface is the end of the Arrow file. Note that there
+  // can be any amount of data preceding the Arrow-formatted data, because we
+  // need only locate the end of the Arrow file stream to discover the metadata
+  // and then proceed to read the data into memory.
+  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+      std::shared_ptr<FileReader>* reader);
+
+  // If the file is embedded within some larger file or memory region, you can
+  // pass the absolute memory offset to the end of the file (which contains the
+  // metadata footer). The metadata must have been written with memory offsets
+  // relative to the start of the containing file
+  //
+  // @param file: the data source
+  // @param footer_offset: the position of the end of the Arrow "file"
+  static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+      int64_t footer_offset, std::shared_ptr<FileReader>* reader);
+
+  /// The schema includes any dictionaries
+  std::shared_ptr<Schema> schema() const;
+
+  int num_record_batches() const;
+
+  MetadataVersion::type version() const;
+
+  // Read a record batch from the file. Does not copy memory if the input
+  // source supports zero-copy.
+  //
+  // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an
+  // "always copy" option)
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
+
+ private:
+  FileReader();
+
+  class ARROW_NO_EXPORT FileReaderImpl;
+  std::unique_ptr<FileReaderImpl> impl_;
+};
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_FILE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc 
b/cpp/src/arrow/ipc/stream-to-file.cc
new file mode 100644
index 0000000..ec0ac43
--- /dev/null
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/file.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/status.h"
+#include <iostream>
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Converts a stream from stdin to a file written to standard out.
+// A typical usage would be:
+// $ <program that produces streaming output> | stream-to-file > file.arrow
+Status ConvertToFile() {
+  std::shared_ptr<io::InputStream> input(new io::StdinStream);
+  std::shared_ptr<ipc::StreamReader> reader;
+  RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
+
+  io::StdoutStream sink;
+  std::shared_ptr<ipc::FileWriter> writer;
+  RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
+
+  std::shared_ptr<RecordBatch> batch;
+  while (true) {
+    RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
+    if (batch == nullptr) break;
+    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+  }
+  return writer->Close();
+}
+
+}  // namespace arrow
+
+int main(int argc, char** argv) {
+  arrow::Status status = arrow::ConvertToFile();
+  if (!status.ok()) {
+    std::cerr << "Could not convert to file: " << status.ToString() << 
std::endl;
+    return 1;
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
deleted file mode 100644
index 7f5c993..0000000
--- a/cpp/src/arrow/ipc/stream.cc
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/stream.h"
-
-#include <cstdint>
-#include <cstring>
-#include <sstream>
-#include <string>
-#include <vector>
-
-#include "arrow/buffer.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/util.h"
-#include "arrow/memory_pool.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/table.h"
-#include "arrow/util/logging.h"
-
-namespace arrow {
-namespace ipc {
-
-// ----------------------------------------------------------------------
-// Stream writer implementation
-
-StreamWriter::StreamWriter(io::OutputStream* sink, const 
std::shared_ptr<Schema>& schema)
-    : sink_(sink),
-      schema_(schema),
-      dictionary_memo_(std::make_shared<DictionaryMemo>()),
-      pool_(default_memory_pool()),
-      position_(-1),
-      started_(false) {}
-
-Status StreamWriter::UpdatePosition() {
-  return sink_->Tell(&position_);
-}
-
-Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) {
-  RETURN_NOT_OK(sink_->Write(data, nbytes));
-  position_ += nbytes;
-  return Status::OK();
-}
-
-Status StreamWriter::Align() {
-  int64_t remainder = PaddedLength(position_) - position_;
-  if (remainder > 0) { return Write(kPaddingBytes, remainder); }
-  return Status::OK();
-}
-
-Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
-  RETURN_NOT_OK(Write(data, nbytes));
-  return Align();
-}
-
-Status StreamWriter::CheckStarted() {
-  if (!started_) { return Start(); }
-  return Status::OK();
-}
-
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* 
block) {
-  RETURN_NOT_OK(CheckStarted());
-
-  block->offset = position_;
-
-  // Frame of reference in file format is 0, see ARROW-384
-  const int64_t buffer_start_offset = 0;
-  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
-      &block->metadata_length, &block->body_length, pool_));
-  RETURN_NOT_OK(UpdatePosition());
-
-  DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned 
writes";
-
-  return Status::OK();
-}
-
-void StreamWriter::set_memory_pool(MemoryPool* pool) {
-  pool_ = pool;
-}
-
-// ----------------------------------------------------------------------
-// StreamWriter implementation
-
-Status StreamWriter::Open(io::OutputStream* sink, const 
std::shared_ptr<Schema>& schema,
-    std::shared_ptr<StreamWriter>* out) {
-  // ctor is private
-  *out = std::shared_ptr<StreamWriter>(new StreamWriter(sink, schema));
-  RETURN_NOT_OK((*out)->UpdatePosition());
-  return Status::OK();
-}
-
-Status StreamWriter::Start() {
-  std::shared_ptr<Buffer> schema_fb;
-  RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), 
&schema_fb));
-
-  int32_t flatbuffer_size = schema_fb->size();
-  RETURN_NOT_OK(
-      Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), 
sizeof(int32_t)));
-
-  // Write the flatbuffer
-  RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
-
-  // If there are any dictionaries, write them as the next messages
-  RETURN_NOT_OK(WriteDictionaries());
-
-  started_ = true;
-  return Status::OK();
-}
-
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
-  // Push an empty FileBlock. Can be written in the footer later
-  record_batches_.emplace_back(0, 0, 0);
-  return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]);
-}
-
-Status StreamWriter::WriteDictionaries() {
-  const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary();
-
-  dictionaries_.resize(id_to_dictionary.size());
-
-  // TODO(wesm): does sorting by id yield any benefit?
-  int dict_index = 0;
-  for (const auto& entry : id_to_dictionary) {
-    FileBlock* block = &dictionaries_[dict_index++];
-
-    block->offset = position_;
-
-    // Frame of reference in file format is 0, see ARROW-384
-    const int64_t buffer_start_offset = 0;
-    RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, 
buffer_start_offset, sink_,
-        &block->metadata_length, &block->body_length, pool_));
-    RETURN_NOT_OK(UpdatePosition());
-    DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned 
writes";
-  }
-
-  return Status::OK();
-}
-
-Status StreamWriter::Close() {
-  // Write the schema if not already written
-  // User is responsible for closing the OutputStream
-  return CheckStarted();
-}
-
-// ----------------------------------------------------------------------
-// StreamReader implementation
-
-static inline std::string message_type_name(Message::Type type) {
-  switch (type) {
-    case Message::SCHEMA:
-      return "schema";
-    case Message::RECORD_BATCH:
-      return "record batch";
-    case Message::DICTIONARY_BATCH:
-      return "dictionary";
-    default:
-      break;
-  }
-  return "unknown";
-}
-
-class StreamReader::StreamReaderImpl {
- public:
-  StreamReaderImpl() {}
-  ~StreamReaderImpl() {}
-
-  Status Open(const std::shared_ptr<io::InputStream>& stream) {
-    stream_ = stream;
-    return ReadSchema();
-  }
-
-  Status ReadNextMessage(Message::Type expected_type, 
std::shared_ptr<Message>* message) {
-    std::shared_ptr<Buffer> buffer;
-    RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
-
-    if (buffer->size() != sizeof(int32_t)) {
-      *message = nullptr;
-      return Status::OK();
-    }
-
-    int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
-
-    RETURN_NOT_OK(stream_->Read(message_length, &buffer));
-    if (buffer->size() != message_length) {
-      return Status::IOError("Unexpected end of stream trying to read 
message");
-    }
-
-    RETURN_NOT_OK(Message::Open(buffer, 0, message));
-
-    if ((*message)->type() != expected_type) {
-      std::stringstream ss;
-      ss << "Message not expected type: " << message_type_name(expected_type)
-         << ", was: " << (*message)->type();
-      return Status::IOError(ss.str());
-    }
-    return Status::OK();
-  }
-
-  Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) {
-    RETURN_NOT_OK(stream_->Read(size, buffer));
-
-    if ((*buffer)->size() < size) {
-      return Status::IOError("Unexpected EOS when reading buffer");
-    }
-    return Status::OK();
-  }
-
-  Status ReadNextDictionary() {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message));
-
-    DictionaryBatchMetadata metadata(message);
-
-    std::shared_ptr<Buffer> batch_body;
-    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body))
-    io::BufferReader reader(batch_body);
-
-    std::shared_ptr<Array> dictionary;
-    RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, 
&dictionary));
-    return dictionary_memo_.AddDictionary(metadata.id(), dictionary);
-  }
-
-  Status ReadSchema() {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message));
-
-    SchemaMetadata schema_meta(message);
-    RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_));
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
-    // those found in the schema
-    int num_dictionaries = static_cast<int>(dictionary_types_.size());
-    for (int i = 0; i < num_dictionaries; ++i) {
-      RETURN_NOT_OK(ReadNextDictionary());
-    }
-
-    return schema_meta.GetSchema(dictionary_memo_, &schema_);
-  }
-
-  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message));
-
-    if (message == nullptr) {
-      // End of stream
-      *batch = nullptr;
-      return Status::OK();
-    }
-
-    RecordBatchMetadata batch_metadata(message);
-
-    std::shared_ptr<Buffer> batch_body;
-    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
-    io::BufferReader reader(batch_body);
-    return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
-  }
-
-  std::shared_ptr<Schema> schema() const { return schema_; }
-
- private:
-  // dictionary_id -> type
-  DictionaryTypeMap dictionary_types_;
-
-  DictionaryMemo dictionary_memo_;
-
-  std::shared_ptr<io::InputStream> stream_;
-  std::shared_ptr<Schema> schema_;
-};
-
-StreamReader::StreamReader() {
-  impl_.reset(new StreamReaderImpl());
-}
-
-StreamReader::~StreamReader() {}
-
-Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
-    std::shared_ptr<StreamReader>* reader) {
-  // Private ctor
-  *reader = std::shared_ptr<StreamReader>(new StreamReader());
-  return (*reader)->impl_->Open(stream);
-}
-
-std::shared_ptr<Schema> StreamReader::schema() const {
-  return impl_->schema();
-}
-
-Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
-  return impl_->GetNextRecordBatch(batch);
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
deleted file mode 100644
index 1c3f65e..0000000
--- a/cpp/src/arrow/ipc/stream.h
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Implement Arrow streaming binary format
-
-#ifndef ARROW_IPC_STREAM_H
-#define ARROW_IPC_STREAM_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/ipc/metadata.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Array;
-class Buffer;
-struct Field;
-class MemoryPool;
-class RecordBatch;
-class Schema;
-class Status;
-
-namespace io {
-
-class InputStream;
-class OutputStream;
-
-}  // namespace io
-
-namespace ipc {
-
-struct ARROW_EXPORT FileBlock {
-  FileBlock() {}
-  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
-      : offset(offset), metadata_length(metadata_length), 
body_length(body_length) {}
-
-  int64_t offset;
-  int32_t metadata_length;
-  int64_t body_length;
-};
-
-class ARROW_EXPORT StreamWriter {
- public:
-  virtual ~StreamWriter() = default;
-
-  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
-      std::shared_ptr<StreamWriter>* out);
-
-  virtual Status WriteRecordBatch(const RecordBatch& batch);
-
-  /// Perform any logic necessary to finish the stream. User is responsible for
-  /// closing the actual OutputStream
-  virtual Status Close();
-
-  // In some cases, writing may require memory allocation. We use the default
-  // memory pool, but provide the option to override
-  void set_memory_pool(MemoryPool* pool);
-
- protected:
-  StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
-
-  virtual Status Start();
-
-  Status CheckStarted();
-  Status UpdatePosition();
-
-  Status WriteDictionaries();
-
-  Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block);
-
-  // Adds padding bytes if necessary to ensure all memory blocks are written on
-  // 8-byte boundaries.
-  Status Align();
-
-  // Write data and update position
-  Status Write(const uint8_t* data, int64_t nbytes);
-
-  // Write and align
-  Status WriteAligned(const uint8_t* data, int64_t nbytes);
-
-  io::OutputStream* sink_;
-  std::shared_ptr<Schema> schema_;
-
-  // When writing out the schema, we keep track of all the dictionaries we
-  // encounter, as they must be written out first in the stream
-  std::shared_ptr<DictionaryMemo> dictionary_memo_;
-
-  MemoryPool* pool_;
-
-  int64_t position_;
-  bool started_;
-
-  std::vector<FileBlock> dictionaries_;
-  std::vector<FileBlock> record_batches_;
-};
-
-class ARROW_EXPORT StreamReader {
- public:
-  ~StreamReader();
-
-  // Open an stream.
-  static Status Open(const std::shared_ptr<io::InputStream>& stream,
-      std::shared_ptr<StreamReader>* reader);
-
-  std::shared_ptr<Schema> schema() const;
-
-  // Returned batch is nullptr when end of stream reached
-  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
-
- private:
-  StreamReader();
-
-  class ARROW_NO_EXPORT StreamReaderImpl;
-  std::unique_ptr<StreamReaderImpl> impl_;
-};
-
-}  // namespace ipc
-}  // namespace arrow
-
-#endif  // ARROW_IPC_STREAM_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
new file mode 100644
index 0000000..975b0d1
--- /dev/null
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/writer.h"
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/schema.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace ipc {
+
+// ----------------------------------------------------------------------
+// Stream writer implementation
+
+class StreamWriter::StreamWriterImpl {
+ public:
+  StreamWriterImpl()
+      : dictionary_memo_(std::make_shared<DictionaryMemo>()),
+        pool_(default_memory_pool()),
+        position_(-1),
+        started_(false) {}
+
+  virtual ~StreamWriterImpl() = default;
+
+  Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+    sink_ = sink;
+    schema_ = schema;
+    return UpdatePosition();
+  }
+
+  virtual Status Start() {
+    std::shared_ptr<Buffer> schema_fb;
+    RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), 
&schema_fb));
+
+    int32_t flatbuffer_size = schema_fb->size();
+    RETURN_NOT_OK(
+        Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), 
sizeof(int32_t)));
+
+    // Write the flatbuffer
+    RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
+
+    // If there are any dictionaries, write them as the next messages
+    RETURN_NOT_OK(WriteDictionaries());
+
+    started_ = true;
+    return Status::OK();
+  }
+
+  virtual Status Close() {
+    // Write the schema if not already written
+    // User is responsible for closing the OutputStream
+    return CheckStarted();
+  }
+
+  Status CheckStarted() {
+    if (!started_) { return Start(); }
+    return Status::OK();
+  }
+
+  Status UpdatePosition() { return sink_->Tell(&position_); }
+
+  Status WriteDictionaries() {
+    const DictionaryMap& id_to_dictionary = 
dictionary_memo_->id_to_dictionary();
+
+    dictionaries_.resize(id_to_dictionary.size());
+
+    // TODO(wesm): does sorting by id yield any benefit?
+    int dict_index = 0;
+    for (const auto& entry : id_to_dictionary) {
+      FileBlock* block = &dictionaries_[dict_index++];
+
+      block->offset = position_;
+
+      // Frame of reference in file format is 0, see ARROW-384
+      const int64_t buffer_start_offset = 0;
+      RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, 
buffer_start_offset, sink_,
+          &block->metadata_length, &block->body_length, pool_));
+      RETURN_NOT_OK(UpdatePosition());
+      DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned 
writes";
+    }
+
+    return Status::OK();
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block) {
+    RETURN_NOT_OK(CheckStarted());
+
+    block->offset = position_;
+
+    // Frame of reference in file format is 0, see ARROW-384
+    const int64_t buffer_start_offset = 0;
+    RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, 
sink_,
+        &block->metadata_length, &block->body_length, pool_));
+    RETURN_NOT_OK(UpdatePosition());
+
+    DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned 
writes";
+
+    return Status::OK();
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch) {
+    // Push an empty FileBlock. Can be written in the footer later
+    record_batches_.emplace_back(0, 0, 0);
+    return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 
1]);
+  }
+
+  // Adds padding bytes if necessary to ensure all memory blocks are written on
+  // 8-byte boundaries.
+  Status Align() {
+    int64_t remainder = PaddedLength(position_) - position_;
+    if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+    return Status::OK();
+  }
+
+  // Write data and update position
+  Status Write(const uint8_t* data, int64_t nbytes) {
+    RETURN_NOT_OK(sink_->Write(data, nbytes));
+    position_ += nbytes;
+    return Status::OK();
+  }
+
+  // Write and align
+  Status WriteAligned(const uint8_t* data, int64_t nbytes) {
+    RETURN_NOT_OK(Write(data, nbytes));
+    return Align();
+  }
+
+  void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
+
+ protected:
+  io::OutputStream* sink_;
+  std::shared_ptr<Schema> schema_;
+
+  // When writing out the schema, we keep track of all the dictionaries we
+  // encounter, as they must be written out first in the stream
+  std::shared_ptr<DictionaryMemo> dictionary_memo_;
+
+  MemoryPool* pool_;
+
+  int64_t position_;
+  bool started_;
+
+  std::vector<FileBlock> dictionaries_;
+  std::vector<FileBlock> record_batches_;
+};
+
+StreamWriter::StreamWriter() {
+  impl_.reset(new StreamWriterImpl());
+}
+
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
+  return impl_->WriteRecordBatch(batch);
+}
+
+void StreamWriter::set_memory_pool(MemoryPool* pool) {
+  impl_->set_memory_pool(pool);
+}
+
+Status StreamWriter::Open(io::OutputStream* sink, const 
std::shared_ptr<Schema>& schema,
+    std::shared_ptr<StreamWriter>* out) {
+  // ctor is private
+  *out = std::shared_ptr<StreamWriter>(new StreamWriter());
+  return (*out)->impl_->Open(sink, schema);
+}
+
+Status StreamWriter::Close() {
+  return impl_->Close();
+}
+
+// ----------------------------------------------------------------------
+// File writer implementation
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+  std::vector<flatbuf::Block> fb_blocks;
+
+  for (const FileBlock& block : blocks) {
+    fb_blocks.emplace_back(block.offset, block.metadata_length, 
block.body_length);
+  }
+
+  return fbb.CreateVectorOfStructs(fb_blocks);
+}
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
+    io::OutputStream* out) {
+  FBB fbb;
+
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
+
+  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+  auto footer = flatbuf::CreateFooter(
+      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+  fbb.Finish(footer);
+
+  int32_t size = fbb.GetSize();
+
+  return out->Write(fbb.GetBufferPointer(), size);
+}
+
+class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
+ public:
+  using BASE = StreamWriter::StreamWriterImpl;
+
+  Status Start() override {
+    RETURN_NOT_OK(WriteAligned(
+        reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes)));
+
+    // We write the schema at the start of the file (and the end). This also
+    // writes all the dictionaries at the beginning of the file
+    return BASE::Start();
+  }
+
+  Status Close() override {
+    // Write metadata
+    int64_t initial_position = position_;
+    RETURN_NOT_OK(WriteFileFooter(
+        *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), 
sink_));
+    RETURN_NOT_OK(UpdatePosition());
+
+    // Write footer length
+    int32_t footer_length = position_ - initial_position;
+
+    if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); }
+
+    RETURN_NOT_OK(
+        Write(reinterpret_cast<const uint8_t*>(&footer_length), 
sizeof(int32_t)));
+
+    // Write magic bytes to end file
+    return Write(
+        reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes));
+  }
+};
+
+FileWriter::FileWriter() {
+  impl_.reset(new FileWriterImpl());
+}
+
+Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
+    std::shared_ptr<FileWriter>* out) {
+  *out = std::shared_ptr<FileWriter>(new FileWriter());  // ctor is private
+  return (*out)->impl_->Open(sink, schema);
+}
+
+Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
+  return impl_->WriteRecordBatch(batch);
+}
+
+Status FileWriter::Close() {
+  return impl_->Close();
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
new file mode 100644
index 0000000..7aff71e
--- /dev/null
+++ b/cpp/src/arrow/ipc/writer.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow streaming binary format
+
+#ifndef ARROW_IPC_STREAM_H
+#define ARROW_IPC_STREAM_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/ipc/metadata.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct Field;
+class MemoryPool;
+class RecordBatch;
+class Schema;
+class Status;
+
+namespace io {
+
+class OutputStream;
+
+}  // namespace io
+
+namespace ipc {
+
+class ARROW_EXPORT StreamWriter {
+ public:
+  virtual ~StreamWriter() = default;
+
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
+      std::shared_ptr<StreamWriter>* out);
+
+  virtual Status WriteRecordBatch(const RecordBatch& batch);
+
+  /// Perform any logic necessary to finish the stream. User is responsible for
+  /// closing the actual OutputStream
+  virtual Status Close();
+
+  // In some cases, writing may require memory allocation. We use the default
+  // memory pool, but provide the option to override
+  void set_memory_pool(MemoryPool* pool);
+
+ protected:
+  StreamWriter();
+  class ARROW_NO_EXPORT StreamWriterImpl;
+  std::unique_ptr<StreamWriterImpl> impl_;
+};
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
+    io::OutputStream* out);
+
+class ARROW_EXPORT FileWriter : public StreamWriter {
+ public:
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
+      std::shared_ptr<FileWriter>* out);
+
+  Status WriteRecordBatch(const RecordBatch& batch) override;
+  Status Close() override;
+
+ private:
+  FileWriter();
+  class ARROW_NO_EXPORT FileWriterImpl;
+  std::unique_ptr<FileWriterImpl> impl_;
+};
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_STREAM_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/CMakeLists.txt 
b/cpp/src/arrow/util/CMakeLists.txt
index 19b1e19..8d9afcc 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -68,26 +68,4 @@ if (ARROW_BUILD_BENCHMARKS)
   endif()
 endif()
 
-if (ARROW_IPC AND ARROW_BUILD_UTILITIES)
-  set(UTIL_LINK_LIBS
-    arrow_ipc_static
-    arrow_io_static
-    arrow_static
-    boost_filesystem_static
-    boost_system_static
-    dl)
-
-  if (NOT APPLE)
-    set(UTIL_LINK_LIBS
-      ${UTIL_LINK_LIBS}
-      boost_filesystem_static
-      boost_system_static)
-  endif()
-
-  add_executable(file-to-stream file-to-stream.cc)
-  target_link_libraries(file-to-stream ${UTIL_LINK_LIBS})
-  add_executable(stream-to-file stream-to-file.cc)
-  target_link_libraries(stream-to-file ${UTIL_LINK_LIBS})
-endif()
-
 ADD_ARROW_TEST(bit-util-test)

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/file-to-stream.cc 
b/cpp/src/arrow/util/file-to-stream.cc
deleted file mode 100644
index 7daf263..0000000
--- a/cpp/src/arrow/util/file-to-stream.cc
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/io/file.h"
-#include "arrow/ipc/file.h"
-#include "arrow/ipc/stream.h"
-#include "arrow/status.h"
-#include <iostream>
-
-#include "arrow/util/io-util.h"
-
-namespace arrow {
-
-// Reads a file on the file system and prints to stdout the stream version of 
it.
-Status ConvertToStream(const char* path) {
-  std::shared_ptr<io::ReadableFile> in_file;
-  std::shared_ptr<ipc::FileReader> reader;
-
-  RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
-  RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
-
-  io::StdoutStream sink;
-  std::shared_ptr<ipc::StreamWriter> writer;
-  RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
-  for (int i = 0; i < reader->num_record_batches(); ++i) {
-    std::shared_ptr<RecordBatch> chunk;
-    RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
-    RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
-  }
-  return writer->Close();
-}
-
-}  // namespace arrow
-
-int main(int argc, char** argv) {
-  if (argc != 2) {
-    std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
-    return 1;
-  }
-  arrow::Status status = arrow::ConvertToStream(argv[1]);
-  if (!status.ok()) {
-    std::cerr << "Could not convert to stream: " << status.ToString() << 
std::endl;
-    return 1;
-  }
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/cpp/src/arrow/util/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/stream-to-file.cc 
b/cpp/src/arrow/util/stream-to-file.cc
deleted file mode 100644
index 393b07d..0000000
--- a/cpp/src/arrow/util/stream-to-file.cc
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/stream.h"
-#include "arrow/io/file.h"
-#include "arrow/ipc/file.h"
-#include "arrow/status.h"
-#include <iostream>
-
-#include "arrow/util/io-util.h"
-
-namespace arrow {
-
-// Converts a stream from stdin to a file written to standard out.
-// A typical usage would be:
-// $ <program that produces streaming output> | stream-to-file > file.arrow
-Status ConvertToFile() {
-  std::shared_ptr<io::InputStream> input(new io::StdinStream);
-  std::shared_ptr<ipc::StreamReader> reader;
-  RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
-
-  io::StdoutStream sink;
-  std::shared_ptr<ipc::FileWriter> writer;
-  RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
-
-  std::shared_ptr<RecordBatch> batch;
-  while (true) {
-    RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
-    if (batch == nullptr) break;
-    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
-  }
-  return writer->Close();
-}
-
-}  // namespace arrow
-
-int main(int argc, char** argv) {
-  arrow::Status status = arrow::ConvertToFile();
-  if (!status.ok()) {
-    std::cerr << "Could not convert to file: " << status.ToString() << 
std::endl;
-    return 1;
-  }
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/16c97592/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd 
b/python/pyarrow/includes/libarrow_ipc.pxd
index afc7dbd..10c70a9 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -23,7 +23,7 @@ from pyarrow.includes.libarrow_io cimport (InputStream, 
OutputStream,
                                            ReadableFileInterface)
 
 
-cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil:
+cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
 
     cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
         @staticmethod
@@ -43,9 +43,6 @@ cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" 
nogil:
 
         CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
 
-
-cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
-
     cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
         @staticmethod
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,

Reply via email to