ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io 
interfaces. Create arrow_ipc leaf library

Several things here

* Clean up IO interface class structure to be able to indicate precise 
characteristics of an implementation
* Make the IPC reader/writer use more generic interfaces -- writing only needs 
an output stream, reading only needs a random access reader. This will unblock 
ARROW-267
* Create a separate arrow_ipc shared library

Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #138 from wesm/ARROW-280 and squashes the following commits:

6a59eb6 [Wes McKinney] * Restructure IO interfaces to accommodate more 
configurations. * Refactor memory mapped IO interfaces to be in line with other 
arrow::io   classes. * Split arrow_ipc into a leaf library * Refactor pyarrow 
and arrow_parquet to suit. Move BufferReader to   arrow_io. Pyarrow parquet 
tests currently segfault


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/559b8652
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/559b8652
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/559b8652

Branch: refs/heads/master
Commit: 559b865226ec0f5d78e87957c2ff0f7711bec9a8
Parents: 17e90e1
Author: Wes McKinney <wes.mckin...@twosigma.com>
Authored: Sun Sep 18 16:01:58 2016 -0400
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Sun Sep 18 16:01:58 2016 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                           |   6 -
 cpp/src/arrow/io/CMakeLists.txt              |  11 +-
 cpp/src/arrow/io/hdfs-io-test.cc             | 315 ----------------------
 cpp/src/arrow/io/hdfs.cc                     |  35 ++-
 cpp/src/arrow/io/hdfs.h                      |  29 +-
 cpp/src/arrow/io/interfaces.h                |  71 ++++-
 cpp/src/arrow/io/io-hdfs-test.cc             | 315 ++++++++++++++++++++++
 cpp/src/arrow/io/io-memory-test.cc           | 125 +++++++++
 cpp/src/arrow/io/libhdfs_shim.cc             |   3 +-
 cpp/src/arrow/io/memory.cc                   | 262 ++++++++++++++++++
 cpp/src/arrow/io/memory.h                    | 130 +++++++++
 cpp/src/arrow/io/test-common.h               |  63 +++++
 cpp/src/arrow/ipc/CMakeLists.txt             |  58 +++-
 cpp/src/arrow/ipc/adapter.cc                 |  61 +++--
 cpp/src/arrow/ipc/adapter.h                  |  39 +--
 cpp/src/arrow/ipc/ipc-adapter-test.cc        |  33 +--
 cpp/src/arrow/ipc/ipc-memory-test.cc         | 127 ---------
 cpp/src/arrow/ipc/memory.cc                  | 182 -------------
 cpp/src/arrow/ipc/memory.h                   | 150 -----------
 cpp/src/arrow/ipc/metadata-internal.cc       |   9 +-
 cpp/src/arrow/ipc/metadata-internal.h        |   2 +-
 cpp/src/arrow/ipc/metadata.h                 |  11 +-
 cpp/src/arrow/ipc/symbols.map                |  18 ++
 cpp/src/arrow/ipc/test-common.h              |  25 --
 cpp/src/arrow/ipc/util.h                     |  56 ++++
 cpp/src/arrow/parquet/CMakeLists.txt         |   1 +
 cpp/src/arrow/parquet/io.cc                  |   4 +-
 cpp/src/arrow/parquet/io.h                   |   4 +-
 cpp/src/arrow/parquet/parquet-io-test.cc     |  51 +---
 cpp/src/arrow/parquet/parquet-schema-test.cc |   3 +-
 cpp/src/arrow/parquet/reader.cc              |   8 +-
 cpp/src/arrow/parquet/reader.h               |   2 +-
 cpp/src/arrow/parquet/schema.cc              |   2 +-
 cpp/src/arrow/parquet/writer.cc              |   2 +-
 cpp/src/arrow/type.h                         |   4 +-
 cpp/src/arrow/util/memory-pool-test.cc       |   2 +-
 python/pyarrow/includes/libarrow_io.pxd      |  42 ++-
 python/pyarrow/includes/parquet.pxd          |  18 +-
 python/pyarrow/io.pxd                        |   7 +-
 python/pyarrow/io.pyx                        |  14 +-
 python/pyarrow/parquet.pyx                   |   6 +-
 41 files changed, 1288 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index a39a752..be95dab 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -626,12 +626,6 @@ set(ARROW_SRCS
   src/arrow/table.cc
   src/arrow/type.cc
 
-  # IPC / Shared memory library; to be turned into an optional component
-  src/arrow/ipc/adapter.cc
-  src/arrow/ipc/memory.cc
-  src/arrow/ipc/metadata.cc
-  src/arrow/ipc/metadata-internal.cc
-
   src/arrow/types/construct.cc
   src/arrow/types/decimal.cc
   src/arrow/types/json.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index b8c0e13..87e227e 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -20,6 +20,7 @@
 
 set(ARROW_IO_LINK_LIBS
   arrow_shared
+  dl
 )
 
 if (ARROW_BOOST_USE_SHARED)
@@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS
   ${ARROW_IO_PRIVATE_LINK_LIBS})
 
 set(ARROW_IO_SRCS
+  memory.cc
 )
 
 if(ARROW_HDFS)
@@ -71,8 +73,8 @@ if(ARROW_HDFS)
     ${ARROW_HDFS_SRCS}
     ${ARROW_IO_SRCS})
 
-  ADD_ARROW_TEST(hdfs-io-test)
-  ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
+  ADD_ARROW_TEST(io-hdfs-test)
+  ARROW_TEST_LINK_LIBRARIES(io-hdfs-test
     ${ARROW_IO_TEST_LINK_LIBS})
 endif()
 
@@ -101,10 +103,15 @@ if (APPLE)
     INSTALL_NAME_DIR "@rpath")
 endif()
 
+ADD_ARROW_TEST(io-memory-test)
+ARROW_TEST_LINK_LIBRARIES(io-memory-test
+  ${ARROW_IO_TEST_LINK_LIBS})
+
 # Headers: top level
 install(FILES
   hdfs.h
   interfaces.h
+  memory.h
   DESTINATION include/arrow/io)
 
 install(TARGETS arrow_io

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc
deleted file mode 100644
index e48a281..0000000
--- a/cpp/src/arrow/io/hdfs-io-test.cc
+++ /dev/null
@@ -1,315 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
-#include <string>
-
-#include "gtest/gtest.h"
-
-#include <boost/filesystem.hpp>  // NOLINT
-
-#include "arrow/io/hdfs.h"
-#include "arrow/test-util.h"
-#include "arrow/util/status.h"
-
-namespace arrow {
-namespace io {
-
-std::vector<uint8_t> RandomData(int64_t size) {
-  std::vector<uint8_t> buffer(size);
-  test::random_bytes(size, 0, buffer.data());
-  return buffer;
-}
-
-class TestHdfsClient : public ::testing::Test {
- public:
-  Status MakeScratchDir() {
-    if (client_->Exists(scratch_dir_)) {
-      RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
-    }
-    return client_->CreateDirectory(scratch_dir_);
-  }
-
-  Status WriteDummyFile(const std::string& path, const uint8_t* buffer, 
int64_t size,
-      bool append = false, int buffer_size = 0, int replication = 0,
-      int default_block_size = 0) {
-    std::shared_ptr<HdfsWriteableFile> file;
-    RETURN_NOT_OK(client_->OpenWriteable(
-        path, append, buffer_size, replication, default_block_size, &file));
-
-    RETURN_NOT_OK(file->Write(buffer, size));
-    RETURN_NOT_OK(file->Close());
-
-    return Status::OK();
-  }
-
-  std::string ScratchPath(const std::string& name) {
-    std::stringstream ss;
-    ss << scratch_dir_ << "/" << name;
-    return ss.str();
-  }
-
-  std::string HdfsAbsPath(const std::string& relpath) {
-    std::stringstream ss;
-    ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
-    return ss.str();
-  }
-
- protected:
-  // Set up shared state between unit tests
-  static void SetUpTestCase() {
-    if (!ConnectLibHdfs().ok()) {
-      std::cout << "Loading libhdfs failed, skipping tests gracefully" << 
std::endl;
-      return;
-    }
-
-    loaded_libhdfs_ = true;
-
-    const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
-    const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
-    const char* user = std::getenv("ARROW_HDFS_TEST_USER");
-
-    ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
-
-    conf_.host = host == nullptr ? "localhost" : host;
-    conf_.user = user;
-    conf_.port = port == nullptr ? 20500 : atoi(port);
-
-    ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
-  }
-
-  static void TearDownTestCase() {
-    if (client_) {
-      EXPECT_OK(client_->Delete(scratch_dir_, true));
-      EXPECT_OK(client_->Disconnect());
-    }
-  }
-
-  static bool loaded_libhdfs_;
-
-  // Resources shared amongst unit tests
-  static HdfsConnectionConfig conf_;
-  static std::string scratch_dir_;
-  static std::shared_ptr<HdfsClient> client_;
-};
-
-bool TestHdfsClient::loaded_libhdfs_ = false;
-HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
-
-std::string TestHdfsClient::scratch_dir_ =
-    boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
-
-std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
-
-#define SKIP_IF_NO_LIBHDFS()                          \
-  if (!loaded_libhdfs_) {                             \
-    std::cout << "No libhdfs, skipping" << std::endl; \
-    return;                                           \
-  }
-
-TEST_F(TestHdfsClient, ConnectsAgain) {
-  SKIP_IF_NO_LIBHDFS();
-
-  std::shared_ptr<HdfsClient> client;
-  ASSERT_OK(HdfsClient::Connect(&conf_, &client));
-  ASSERT_OK(client->Disconnect());
-}
-
-TEST_F(TestHdfsClient, CreateDirectory) {
-  SKIP_IF_NO_LIBHDFS();
-
-  std::string path = ScratchPath("create-directory");
-
-  if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
-
-  ASSERT_OK(client_->CreateDirectory(path));
-  ASSERT_TRUE(client_->Exists(path));
-  EXPECT_OK(client_->Delete(path, true));
-  ASSERT_FALSE(client_->Exists(path));
-}
-
-TEST_F(TestHdfsClient, GetCapacityUsed) {
-  SKIP_IF_NO_LIBHDFS();
-
-  // Who knows what is actually in your DFS cluster, but expect it to have
-  // positive used bytes and capacity
-  int64_t nbytes = 0;
-  ASSERT_OK(client_->GetCapacity(&nbytes));
-  ASSERT_LT(0, nbytes);
-
-  ASSERT_OK(client_->GetUsed(&nbytes));
-  ASSERT_LT(0, nbytes);
-}
-
-TEST_F(TestHdfsClient, GetPathInfo) {
-  SKIP_IF_NO_LIBHDFS();
-
-  HdfsPathInfo info;
-
-  ASSERT_OK(MakeScratchDir());
-
-  // Directory info
-  ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
-  ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
-  ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
-  ASSERT_EQ(conf_.user, info.owner);
-
-  // TODO(wesm): test group, other attrs
-
-  auto path = ScratchPath("test-file");
-
-  const int size = 100;
-
-  std::vector<uint8_t> buffer = RandomData(size);
-
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
-  ASSERT_OK(client_->GetPathInfo(path, &info));
-
-  ASSERT_EQ(ObjectType::FILE, info.kind);
-  ASSERT_EQ(HdfsAbsPath(path), info.name);
-  ASSERT_EQ(conf_.user, info.owner);
-  ASSERT_EQ(size, info.size);
-}
-
-TEST_F(TestHdfsClient, AppendToFile) {
-  SKIP_IF_NO_LIBHDFS();
-
-  ASSERT_OK(MakeScratchDir());
-
-  auto path = ScratchPath("test-file");
-  const int size = 100;
-
-  std::vector<uint8_t> buffer = RandomData(size);
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
-
-  // now append
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
-
-  HdfsPathInfo info;
-  ASSERT_OK(client_->GetPathInfo(path, &info));
-  ASSERT_EQ(size * 2, info.size);
-}
-
-TEST_F(TestHdfsClient, ListDirectory) {
-  SKIP_IF_NO_LIBHDFS();
-
-  const int size = 100;
-  std::vector<uint8_t> data = RandomData(size);
-
-  auto p1 = ScratchPath("test-file-1");
-  auto p2 = ScratchPath("test-file-2");
-  auto d1 = ScratchPath("test-dir-1");
-
-  ASSERT_OK(MakeScratchDir());
-  ASSERT_OK(WriteDummyFile(p1, data.data(), size));
-  ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
-  ASSERT_OK(client_->CreateDirectory(d1));
-
-  std::vector<HdfsPathInfo> listing;
-  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
-
-  // Do it again, appends!
-  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
-
-  ASSERT_EQ(6, static_cast<int>(listing.size()));
-
-  // Argh, well, shouldn't expect the listing to be in any particular order
-  for (size_t i = 0; i < listing.size(); ++i) {
-    const HdfsPathInfo& info = listing[i];
-    if (info.name == HdfsAbsPath(p1)) {
-      ASSERT_EQ(ObjectType::FILE, info.kind);
-      ASSERT_EQ(size, info.size);
-    } else if (info.name == HdfsAbsPath(p2)) {
-      ASSERT_EQ(ObjectType::FILE, info.kind);
-      ASSERT_EQ(size / 2, info.size);
-    } else if (info.name == HdfsAbsPath(d1)) {
-      ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
-    } else {
-      FAIL() << "Unexpected path: " << info.name;
-    }
-  }
-}
-
-TEST_F(TestHdfsClient, ReadableMethods) {
-  SKIP_IF_NO_LIBHDFS();
-
-  ASSERT_OK(MakeScratchDir());
-
-  auto path = ScratchPath("test-file");
-  const int size = 100;
-
-  std::vector<uint8_t> data = RandomData(size);
-  ASSERT_OK(WriteDummyFile(path, data.data(), size));
-
-  std::shared_ptr<HdfsReadableFile> file;
-  ASSERT_OK(client_->OpenReadable(path, &file));
-
-  // Test GetSize -- move this into its own unit test if ever needed
-  int64_t file_size;
-  ASSERT_OK(file->GetSize(&file_size));
-  ASSERT_EQ(size, file_size);
-
-  uint8_t buffer[50];
-  int64_t bytes_read = 0;
-
-  ASSERT_OK(file->Read(50, &bytes_read, buffer));
-  ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
-  ASSERT_EQ(50, bytes_read);
-
-  ASSERT_OK(file->Read(50, &bytes_read, buffer));
-  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
-  ASSERT_EQ(50, bytes_read);
-
-  // EOF
-  ASSERT_OK(file->Read(1, &bytes_read, buffer));
-  ASSERT_EQ(0, bytes_read);
-
-  // ReadAt to EOF
-  ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
-  ASSERT_EQ(40, bytes_read);
-  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
-
-  // Seek, Tell
-  ASSERT_OK(file->Seek(60));
-
-  int64_t position;
-  ASSERT_OK(file->Tell(&position));
-  ASSERT_EQ(60, position);
-}
-
-TEST_F(TestHdfsClient, RenameFile) {
-  SKIP_IF_NO_LIBHDFS();
-
-  ASSERT_OK(MakeScratchDir());
-
-  auto src_path = ScratchPath("src-file");
-  auto dst_path = ScratchPath("dst-file");
-  const int size = 100;
-
-  std::vector<uint8_t> data = RandomData(size);
-  ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
-
-  ASSERT_OK(client_->Rename(src_path, dst_path));
-
-  ASSERT_FALSE(client_->Exists(src_path));
-  ASSERT_TRUE(client_->Exists(dst_path));
-}
-
-}  // namespace io
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 800c3ed..a6b4b2f 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt(
   return impl_->ReadAt(position, nbytes, bytes_read, buffer);
 }
 
+Status HdfsReadableFile::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  return Status::NotImplemented("Not yet implemented");
+}
+
+bool HdfsReadableFile::supports_zero_copy() const {
+  return false;
+}
+
 Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* 
buffer) {
   return impl_->Read(nbytes, bytes_read, buffer);
 }
@@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) {
 // File writing
 
 // Private implementation for writeable-only files
-class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
+class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
  public:
-  HdfsWriteableFileImpl() {}
+  HdfsOutputStreamImpl() {}
 
   Status Close() {
     if (is_open_) {
@@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public 
HdfsAnyFileImpl {
   }
 };
 
-HdfsWriteableFile::HdfsWriteableFile() {
-  impl_.reset(new HdfsWriteableFileImpl());
+HdfsOutputStream::HdfsOutputStream() {
+  impl_.reset(new HdfsOutputStreamImpl());
 }
 
-HdfsWriteableFile::~HdfsWriteableFile() {
+HdfsOutputStream::~HdfsOutputStream() {
   impl_->Close();
 }
 
-Status HdfsWriteableFile::Close() {
+Status HdfsOutputStream::Close() {
   return impl_->Close();
 }
 
-Status HdfsWriteableFile::Write(
+Status HdfsOutputStream::Write(
     const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
   return impl_->Write(buffer, nbytes, bytes_read);
 }
 
-Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
+Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) {
   int64_t bytes_written_dummy = 0;
   return Write(buffer, nbytes, &bytes_written_dummy);
 }
 
-Status HdfsWriteableFile::Tell(int64_t* position) {
+Status HdfsOutputStream::Tell(int64_t* position) {
   return impl_->Tell(position);
 }
 
@@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl {
 
   Status OpenWriteable(const std::string& path, bool append, int32_t 
buffer_size,
       int16_t replication, int64_t default_block_size,
-      std::shared_ptr<HdfsWriteableFile>* file) {
+      std::shared_ptr<HdfsOutputStream>* file) {
     int flags = O_WRONLY;
     if (append) flags |= O_APPEND;
 
@@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl {
     }
 
     // std::make_shared does not work with private ctors
-    *file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
+    *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
     (*file)->impl_->set_members(path, fs_, handle);
 
     return Status::OK();
@@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable(
 
 Status HdfsClient::OpenWriteable(const std::string& path, bool append,
     int32_t buffer_size, int16_t replication, int64_t default_block_size,
-    std::shared_ptr<HdfsWriteableFile>* file) {
+    std::shared_ptr<HdfsOutputStream>* file) {
   return impl_->OpenWriteable(
       path, append, buffer_size, replication, default_block_size, file);
 }
 
 Status HdfsClient::OpenWriteable(
-    const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* 
file) {
+    const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* 
file) {
   return OpenWriteable(path, append, 0, 0, 0, file);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index b6449fc..39720cc 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -29,13 +29,14 @@
 
 namespace arrow {
 
+class Buffer;
 class Status;
 
 namespace io {
 
 class HdfsClient;
 class HdfsReadableFile;
-class HdfsWriteableFile;
+class HdfsOutputStream;
 
 struct HdfsPathInfo {
   ObjectType::type kind;
@@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
   // @param default_block_size, 0 for default
   Status OpenWriteable(const std::string& path, bool append, int32_t 
buffer_size,
       int16_t replication, int64_t default_block_size,
-      std::shared_ptr<HdfsWriteableFile>* file);
+      std::shared_ptr<HdfsOutputStream>* file);
 
   Status OpenWriteable(
-      const std::string& path, bool append, 
std::shared_ptr<HdfsWriteableFile>* file);
+      const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* 
file);
 
  private:
   friend class HdfsReadableFile;
-  friend class HdfsWriteableFile;
+  friend class HdfsOutputStream;
 
   class ARROW_NO_EXPORT HdfsClientImpl;
   std::unique_ptr<HdfsClientImpl> impl_;
@@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
   DISALLOW_COPY_AND_ASSIGN(HdfsClient);
 };
 
-class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
+class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
  public:
   ~HdfsReadableFile();
 
@@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public 
RandomAccessFile {
   Status ReadAt(
       int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) 
override;
 
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
+
+  bool supports_zero_copy() const override;
+
   Status Seek(int64_t position) override;
   Status Tell(int64_t* position) override;
 
@@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public 
RandomAccessFile {
   DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
 };
 
-class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
+// Naming this file OutputStream because it does not support seeking (like the
+// WriteableFile interface)
+class ARROW_EXPORT HdfsOutputStream : public OutputStream {
  public:
-  ~HdfsWriteableFile();
+  ~HdfsOutputStream();
 
   Status Close() override;
 
@@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public 
WriteableFile {
   Status Tell(int64_t* position) override;
 
  private:
-  class ARROW_NO_EXPORT HdfsWriteableFileImpl;
-  std::unique_ptr<HdfsWriteableFileImpl> impl_;
+  class ARROW_NO_EXPORT HdfsOutputStreamImpl;
+  std::unique_ptr<HdfsOutputStreamImpl> impl_;
 
   friend class HdfsClient::HdfsClientImpl;
 
-  HdfsWriteableFile();
+  HdfsOutputStream();
 
-  DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
+  DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
 };
 
 Status ARROW_EXPORT ConnectLibHdfs();

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index c212852..fa34b43 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -21,8 +21,11 @@
 #include <cstdint>
 #include <memory>
 
+#include "arrow/util/macros.h"
+
 namespace arrow {
 
+class Buffer;
 class Status;
 
 namespace io {
@@ -40,30 +43,78 @@ class FileSystemClient {
   virtual ~FileSystemClient() {}
 };
 
-class FileBase {
+class FileInterface {
  public:
+  virtual ~FileInterface() {}
   virtual Status Close() = 0;
   virtual Status Tell(int64_t* position) = 0;
+
+  FileMode::type mode() const { return mode_; }
+
+ protected:
+  FileInterface() {}
+  FileMode::type mode_;
+
+  void set_mode(FileMode::type mode) { mode_ = mode; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(FileInterface);
 };
 
-class ReadableFile : public FileBase {
+class Seekable {
  public:
-  virtual Status ReadAt(
-      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) 
= 0;
+  virtual Status Seek(int64_t position) = 0;
+};
 
-  virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 
0;
+class Writeable {
+ public:
+  virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;
+};
 
-  virtual Status GetSize(int64_t* size) = 0;
+class Readable {
+ public:
+  virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
+};
+
+class OutputStream : public FileInterface, public Writeable {
+ protected:
+  OutputStream() {}
 };
 
-class RandomAccessFile : public ReadableFile {
+class InputStream : public FileInterface, public Readable {
+ protected:
+  InputStream() {}
+};
+
+class ReadableFileInterface : public InputStream, public Seekable {
  public:
-  virtual Status Seek(int64_t position) = 0;
+  virtual Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
+
+  virtual Status GetSize(int64_t* size) = 0;
+
+  // Does not copy if not necessary
+  virtual Status ReadAt(
+      int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
+
+  virtual bool supports_zero_copy() const = 0;
+
+ protected:
+  ReadableFileInterface() { set_mode(FileMode::READ); }
 };
 
-class WriteableFile : public FileBase {
+class WriteableFileInterface : public OutputStream, public Seekable {
  public:
-  virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
+  virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t 
nbytes) = 0;
+
+ protected:
+  WriteableFileInterface() { set_mode(FileMode::READ); }
+};
+
+class ReadWriteFileInterface : public ReadableFileInterface,
+                               public WriteableFileInterface {
+ protected:
+  ReadWriteFileInterface() { 
ReadableFileInterface::set_mode(FileMode::READWRITE); }
 };
 
 }  // namespace io

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
new file mode 100644
index 0000000..7901932
--- /dev/null
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include <boost/filesystem.hpp>  // NOLINT
+
+#include "arrow/io/hdfs.h"
+#include "arrow/test-util.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+std::vector<uint8_t> RandomData(int64_t size) {
+  std::vector<uint8_t> buffer(size);
+  test::random_bytes(size, 0, buffer.data());
+  return buffer;
+}
+
+class TestHdfsClient : public ::testing::Test {
+ public:
+  Status MakeScratchDir() {
+    if (client_->Exists(scratch_dir_)) {
+      RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
+    }
+    return client_->CreateDirectory(scratch_dir_);
+  }
+
+  Status WriteDummyFile(const std::string& path, const uint8_t* buffer, 
int64_t size,
+      bool append = false, int buffer_size = 0, int replication = 0,
+      int default_block_size = 0) {
+    std::shared_ptr<HdfsOutputStream> file;
+    RETURN_NOT_OK(client_->OpenWriteable(
+        path, append, buffer_size, replication, default_block_size, &file));
+
+    RETURN_NOT_OK(file->Write(buffer, size));
+    RETURN_NOT_OK(file->Close());
+
+    return Status::OK();
+  }
+
+  std::string ScratchPath(const std::string& name) {
+    std::stringstream ss;
+    ss << scratch_dir_ << "/" << name;
+    return ss.str();
+  }
+
+  std::string HdfsAbsPath(const std::string& relpath) {
+    std::stringstream ss;
+    ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
+    return ss.str();
+  }
+
+ protected:
+  // Set up shared state between unit tests
+  static void SetUpTestCase() {
+    if (!ConnectLibHdfs().ok()) {
+      std::cout << "Loading libhdfs failed, skipping tests gracefully" << 
std::endl;
+      return;
+    }
+
+    loaded_libhdfs_ = true;
+
+    const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
+    const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
+    const char* user = std::getenv("ARROW_HDFS_TEST_USER");
+
+    ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
+
+    conf_.host = host == nullptr ? "localhost" : host;
+    conf_.user = user;
+    conf_.port = port == nullptr ? 20500 : atoi(port);
+
+    ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
+  }
+
+  static void TearDownTestCase() {
+    if (client_) {
+      EXPECT_OK(client_->Delete(scratch_dir_, true));
+      EXPECT_OK(client_->Disconnect());
+    }
+  }
+
+  static bool loaded_libhdfs_;
+
+  // Resources shared amongst unit tests
+  static HdfsConnectionConfig conf_;
+  static std::string scratch_dir_;
+  static std::shared_ptr<HdfsClient> client_;
+};
+
+bool TestHdfsClient::loaded_libhdfs_ = false;
+HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
+
+std::string TestHdfsClient::scratch_dir_ =
+    boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+
+std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
+
+#define SKIP_IF_NO_LIBHDFS()                          \
+  if (!loaded_libhdfs_) {                             \
+    std::cout << "No libhdfs, skipping" << std::endl; \
+    return;                                           \
+  }
+
+TEST_F(TestHdfsClient, ConnectsAgain) {
+  SKIP_IF_NO_LIBHDFS();
+
+  std::shared_ptr<HdfsClient> client;
+  ASSERT_OK(HdfsClient::Connect(&conf_, &client));
+  ASSERT_OK(client->Disconnect());
+}
+
+TEST_F(TestHdfsClient, CreateDirectory) {
+  SKIP_IF_NO_LIBHDFS();
+
+  std::string path = ScratchPath("create-directory");
+
+  if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
+
+  ASSERT_OK(client_->CreateDirectory(path));
+  ASSERT_TRUE(client_->Exists(path));
+  EXPECT_OK(client_->Delete(path, true));
+  ASSERT_FALSE(client_->Exists(path));
+}
+
+TEST_F(TestHdfsClient, GetCapacityUsed) {
+  SKIP_IF_NO_LIBHDFS();
+
+  // Who knows what is actually in your DFS cluster, but expect it to have
+  // positive used bytes and capacity
+  int64_t nbytes = 0;
+  ASSERT_OK(client_->GetCapacity(&nbytes));
+  ASSERT_LT(0, nbytes);
+
+  ASSERT_OK(client_->GetUsed(&nbytes));
+  ASSERT_LT(0, nbytes);
+}
+
+TEST_F(TestHdfsClient, GetPathInfo) {
+  SKIP_IF_NO_LIBHDFS();
+
+  HdfsPathInfo info;
+
+  ASSERT_OK(MakeScratchDir());
+
+  // Directory info
+  ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
+  ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+  ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
+  ASSERT_EQ(conf_.user, info.owner);
+
+  // TODO(wesm): test group, other attrs
+
+  auto path = ScratchPath("test-file");
+
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+  ASSERT_OK(client_->GetPathInfo(path, &info));
+
+  ASSERT_EQ(ObjectType::FILE, info.kind);
+  ASSERT_EQ(HdfsAbsPath(path), info.name);
+  ASSERT_EQ(conf_.user, info.owner);
+  ASSERT_EQ(size, info.size);
+}
+
+TEST_F(TestHdfsClient, AppendToFile) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto path = ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+
+  // now append
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
+
+  HdfsPathInfo info;
+  ASSERT_OK(client_->GetPathInfo(path, &info));
+  ASSERT_EQ(size * 2, info.size);
+}
+
+TEST_F(TestHdfsClient, ListDirectory) {
+  SKIP_IF_NO_LIBHDFS();
+
+  const int size = 100;
+  std::vector<uint8_t> data = RandomData(size);
+
+  auto p1 = ScratchPath("test-file-1");
+  auto p2 = ScratchPath("test-file-2");
+  auto d1 = ScratchPath("test-dir-1");
+
+  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(WriteDummyFile(p1, data.data(), size));
+  ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
+  ASSERT_OK(client_->CreateDirectory(d1));
+
+  std::vector<HdfsPathInfo> listing;
+  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+  // Do it again, appends!
+  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+  ASSERT_EQ(6, static_cast<int>(listing.size()));
+
+  // Argh, well, shouldn't expect the listing to be in any particular order
+  for (size_t i = 0; i < listing.size(); ++i) {
+    const HdfsPathInfo& info = listing[i];
+    if (info.name == HdfsAbsPath(p1)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size, info.size);
+    } else if (info.name == HdfsAbsPath(p2)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size / 2, info.size);
+    } else if (info.name == HdfsAbsPath(d1)) {
+      ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+    } else {
+      FAIL() << "Unexpected path: " << info.name;
+    }
+  }
+}
+
+TEST_F(TestHdfsClient, ReadableMethods) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto path = ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(client_->OpenReadable(path, &file));
+
+  // Test GetSize -- move this into its own unit test if ever needed
+  int64_t file_size;
+  ASSERT_OK(file->GetSize(&file_size));
+  ASSERT_EQ(size, file_size);
+
+  uint8_t buffer[50];
+  int64_t bytes_read = 0;
+
+  ASSERT_OK(file->Read(50, &bytes_read, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
+  ASSERT_EQ(50, bytes_read);
+
+  ASSERT_OK(file->Read(50, &bytes_read, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
+  ASSERT_EQ(50, bytes_read);
+
+  // EOF
+  ASSERT_OK(file->Read(1, &bytes_read, buffer));
+  ASSERT_EQ(0, bytes_read);
+
+  // ReadAt to EOF
+  ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
+  ASSERT_EQ(40, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
+
+  // Seek, Tell
+  ASSERT_OK(file->Seek(60));
+
+  int64_t position;
+  ASSERT_OK(file->Tell(&position));
+  ASSERT_EQ(60, position);
+}
+
+TEST_F(TestHdfsClient, RenameFile) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto src_path = ScratchPath("src-file");
+  auto dst_path = ScratchPath("dst-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
+
+  ASSERT_OK(client_->Rename(src_path, dst_path));
+
+  ASSERT_FALSE(client_->Exists(src_path));
+  ASSERT_TRUE(client_->Exists(dst_path));
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc 
b/cpp/src/arrow/io/io-memory-test.cc
new file mode 100644
index 0000000..6de35da
--- /dev/null
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+
+namespace arrow {
+namespace io {
+
+class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
+ public:
+  void TearDown() { MemoryMapFixture::TearDown(); }
+};
+
+TEST_F(TestMemoryMappedFile, InvalidUsages) {}
+
+TEST_F(TestMemoryMappedFile, WriteRead) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+
+  test::random_bytes(1024, 0, buffer.data());
+
+  const int reps = 5;
+
+  std::string path = "ipc-write-read-test";
+  CreateFile(path, reps * buffer_size);
+
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result));
+
+  int64_t position = 0;
+  std::shared_ptr<Buffer> out_buffer;
+  for (int i = 0; i < reps; ++i) {
+    ASSERT_OK(result->Write(buffer.data(), buffer_size));
+    ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
+
+    ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+    position += buffer_size;
+  }
+}
+
+TEST_F(TestMemoryMappedFile, ReadOnly) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+
+  test::random_bytes(1024, 0, buffer.data());
+
+  const int reps = 5;
+
+  std::string path = "ipc-read-only-test";
+  CreateFile(path, reps * buffer_size);
+
+  std::shared_ptr<MemoryMappedFile> rwmmap;
+  ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+
+  int64_t position = 0;
+  for (int i = 0; i < reps; ++i) {
+    ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
+    position += buffer_size;
+  }
+  rwmmap->Close();
+
+  std::shared_ptr<MemoryMappedFile> rommap;
+  ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+  position = 0;
+  std::shared_ptr<Buffer> out_buffer;
+  for (int i = 0; i < reps; ++i) {
+    ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
+
+    ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+    position += buffer_size;
+  }
+  rommap->Close();
+}
+
+TEST_F(TestMemoryMappedFile, InvalidMode) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+
+  test::random_bytes(1024, 0, buffer.data());
+
+  std::string path = "ipc-invalid-mode-test";
+  CreateFile(path, buffer_size);
+
+  std::shared_ptr<MemoryMappedFile> rommap;
+  ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+  ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size));
+}
+
+TEST_F(TestMemoryMappedFile, InvalidFile) {
+  std::string non_existent_path = "invalid-file-name-asfd";
+
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_RAISES(
+      IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, 
&result));
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
index 003570d..0b805ab 100644
--- a/cpp/src/arrow/io/libhdfs_shim.cc
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -51,8 +51,7 @@ extern "C" {
 #include <type_traits>
 #include <vector>
 
-#include <boost/filesystem.hpp>        // NOLINT
-#include <boost/algorithm/string.hpp>  // NOLINT
+#include <boost/filesystem.hpp>  // NOLINT
 
 #include "arrow/util/status.h"
 #include "arrow/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
new file mode 100644
index 0000000..1dd6c3a
--- /dev/null
+++ b/cpp/src/arrow/io/memory.cc
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/memory.h"
+
+#include <sys/mman.h>  // For memory-mapping
+
+#include <algorithm>
+#include <cerrno>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <sstream>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+// Implement MemoryMappedFile
+
+class MemoryMappedFile::MemoryMappedFileImpl {
+ public:
+  MemoryMappedFileImpl()
+      : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {}
+
+  ~MemoryMappedFileImpl() {
+    if (is_open_) {
+      munmap(data_, size_);
+      fclose(file_);
+    }
+  }
+
+  Status Open(const std::string& path, FileMode::type mode) {
+    if (is_open_) { return Status::IOError("A file is already open"); }
+
+    int prot_flags = PROT_READ;
+
+    if (mode == FileMode::READWRITE) {
+      file_ = fopen(path.c_str(), "r+b");
+      prot_flags |= PROT_WRITE;
+      is_writable_ = true;
+    } else {
+      file_ = fopen(path.c_str(), "rb");
+    }
+    if (file_ == nullptr) {
+      std::stringstream ss;
+      ss << "Unable to open file, errno: " << errno;
+      return Status::IOError(ss.str());
+    }
+
+    fseek(file_, 0L, SEEK_END);
+    if (ferror(file_)) { return Status::IOError("Unable to seek to end of 
file"); }
+    size_ = ftell(file_);
+
+    fseek(file_, 0L, SEEK_SET);
+    is_open_ = true;
+    position_ = 0;
+
+    void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 
0);
+    if (result == MAP_FAILED) {
+      std::stringstream ss;
+      ss << "Memory mapping file failed, errno: " << errno;
+      return Status::IOError(ss.str());
+    }
+    data_ = reinterpret_cast<uint8_t*>(result);
+
+    return Status::OK();
+  }
+
+  int64_t size() const { return size_; }
+
+  Status Seek(int64_t position) {
+    if (position < 0 || position >= size_) {
+      return Status::Invalid("position is out of bounds");
+    }
+    position_ = position;
+    return Status::OK();
+  }
+
+  int64_t position() { return position_; }
+
+  void advance(int64_t nbytes) { position_ = std::min(size_, position_ + 
nbytes); }
+
+  uint8_t* data() { return data_; }
+
+  uint8_t* head() { return data_ + position_; }
+
+  bool writable() { return is_writable_; }
+
+  bool opened() { return is_open_; }
+
+ private:
+  FILE* file_;
+  int64_t position_;
+  int64_t size_;
+  bool is_open_;
+  bool is_writable_;
+
+  // The memory map
+  uint8_t* data_;
+};
+
+MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
+  ReadableFileInterface::set_mode(mode);
+}
+
+Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
+    std::shared_ptr<MemoryMappedFile>* out) {
+  std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
+
+  result->impl_.reset(new MemoryMappedFileImpl());
+  RETURN_NOT_OK(result->impl_->Open(path, mode));
+
+  *out = result;
+  return Status::OK();
+}
+
+Status MemoryMappedFile::GetSize(int64_t* size) {
+  *size = impl_->size();
+  return Status::OK();
+}
+
+Status MemoryMappedFile::Tell(int64_t* position) {
+  *position = impl_->position();
+  return Status::OK();
+}
+
+Status MemoryMappedFile::Seek(int64_t position) {
+  return impl_->Seek(position);
+}
+
+Status MemoryMappedFile::Close() {
+  // munmap handled in pimpl dtor
+  return Status::OK();
+}
+
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* 
out) {
+  nbytes = std::min(nbytes, impl_->size() - impl_->position());
+  std::memcpy(out, impl_->head(), nbytes);
+  *bytes_read = nbytes;
+  impl_->advance(nbytes);
+  return Status::OK();
+}
+
+Status MemoryMappedFile::ReadAt(
+    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  RETURN_NOT_OK(impl_->Seek(position));
+  return Read(nbytes, bytes_read, out);
+}
+
+Status MemoryMappedFile::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  nbytes = std::min(nbytes, impl_->size() - position);
+  RETURN_NOT_OK(impl_->Seek(position));
+  *out = std::make_shared<Buffer>(impl_->head(), nbytes);
+  impl_->advance(nbytes);
+  return Status::OK();
+}
+
+bool MemoryMappedFile::supports_zero_copy() const {
+  return true;
+}
+
+Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, 
int64_t nbytes) {
+  if (!impl_->opened() || !impl_->writable()) {
+    return Status::IOError("Unable to write");
+  }
+
+  RETURN_NOT_OK(impl_->Seek(position));
+  return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
+  if (!impl_->opened() || !impl_->writable()) {
+    return Status::IOError("Unable to write");
+  }
+  if (nbytes + impl_->position() > impl_->size()) {
+    return Status::Invalid("Cannot write past end of memory map");
+  }
+
+  return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
+  memcpy(impl_->head(), data, nbytes);
+  impl_->advance(nbytes);
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// In-memory buffer reader
+
+Status BufferReader::Close() {
+  // no-op
+  return Status::OK();
+}
+
+Status BufferReader::Tell(int64_t* position) {
+  *position = position_;
+  return Status::OK();
+}
+
+Status BufferReader::ReadAt(
+    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, bytes_read, buffer);
+}
+
+Status BufferReader::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  int64_t size = std::min(nbytes, buffer_size_ - position_);
+  *out = std::make_shared<Buffer>(buffer_ + position, size);
+  position_ += nbytes;
+  return Status::OK();
+}
+
+bool BufferReader::supports_zero_copy() const {
+  return true;
+}
+
+Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* 
buffer) {
+  memcpy(buffer, buffer_ + position_, nbytes);
+  *bytes_read = std::min(nbytes, buffer_size_ - position_);
+  position_ += *bytes_read;
+  return Status::OK();
+}
+
+Status BufferReader::GetSize(int64_t* size) {
+  *size = buffer_size_;
+  return Status::OK();
+}
+
+Status BufferReader::Seek(int64_t position) {
+  if (position < 0 || position >= buffer_size_) {
+    return Status::IOError("position out of bounds");
+  }
+
+  position_ = position;
+  return Status::OK();
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
new file mode 100644
index 0000000..6fe47c3
--- /dev/null
+++ b/cpp/src/arrow/io/memory.h
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Public API for different memory sharing / IO mechanisms
+
+#ifndef ARROW_IO_MEMORY_H
+#define ARROW_IO_MEMORY_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class MutableBuffer;
+class Status;
+
+namespace io {
+
+// An output stream that writes to a MutableBuffer, such as one obtained from a
+// memory map
+//
+// TODO(wesm): Implement this class
+class ARROW_EXPORT BufferOutputStream : public OutputStream {
+ public:
+  explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
+      : buffer_(buffer) {}
+
+  // Implement the OutputStream interface
+  Status Close() override;
+  Status Tell(int64_t* position) override;
+  Status Write(const uint8_t* data, int64_t length) override;
+
+  // Returns the number of bytes remaining in the buffer
+  int64_t bytes_remaining() const;
+
+ private:
+  std::shared_ptr<MutableBuffer> buffer_;
+  int64_t capacity_;
+  int64_t position_;
+};
+
+// A memory source that uses memory-mapped files for memory interactions
+class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
+ public:
+  static Status Open(const std::string& path, FileMode::type mode,
+      std::shared_ptr<MemoryMappedFile>* out);
+
+  Status Close() override;
+
+  Status Tell(int64_t* position) override;
+
+  Status Seek(int64_t position) override;
+
+  // Required by ReadableFileInterface, copies memory into out
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
+
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) 
override;
+
+  // Read into a buffer, zero copy if possible
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
+
+  bool supports_zero_copy() const override;
+
+  Status Write(const uint8_t* data, int64_t nbytes) override;
+
+  Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) 
override;
+
+  // @return: the size in bytes of the memory source
+  Status GetSize(int64_t* size) override;
+
+ private:
+  explicit MemoryMappedFile(FileMode::type mode);
+
+  Status WriteInternal(const uint8_t* data, int64_t nbytes);
+
+  // Hide the internal details of this class for now
+  class MemoryMappedFileImpl;
+  std::unique_ptr<MemoryMappedFileImpl> impl_;
+};
+
+class ARROW_EXPORT BufferReader : public ReadableFileInterface {
+ public:
+  BufferReader(const uint8_t* buffer, int buffer_size)
+      : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+  Status Close() override;
+  Status Tell(int64_t* position) override;
+
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) 
override;
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+  Status GetSize(int64_t* size) override;
+  Status Seek(int64_t position) override;
+
+  bool supports_zero_copy() const override;
+
+ private:
+  const uint8_t* buffer_;
+  int buffer_size_;
+  int64_t position_;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_MEMORY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
new file mode 100644
index 0000000..1954d47
--- /dev/null
+++ b/cpp/src/arrow/io/test-common.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_TEST_COMMON_H
+#define ARROW_IO_TEST_COMMON_H
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/io/memory.h"
+#include "arrow/test-util.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+
+namespace arrow {
+namespace io {
+
+class MemoryMapFixture {
+ public:
+  void TearDown() {
+    for (auto path : tmp_files_) {
+      std::remove(path.c_str());
+    }
+  }
+
+  void CreateFile(const std::string path, int64_t size) {
+    FILE* file = fopen(path.c_str(), "w");
+    if (file != nullptr) { tmp_files_.push_back(path); }
+    ftruncate(fileno(file), size);
+    fclose(file);
+  }
+
+  Status InitMemoryMap(
+      int64_t size, const std::string& path, 
std::shared_ptr<MemoryMappedFile>* mmap) {
+    CreateFile(path, size);
+    return MemoryMappedFile::Open(path, FileMode::READWRITE, mmap);
+  }
+
+ private:
+  std::vector<std::string> tmp_files_;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_TEST_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 8263416..e5553a6 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -19,16 +19,50 @@
 # arrow_ipc
 #######################################
 
-# Headers: top level
-install(FILES
-  adapter.h
-  metadata.h
-  memory.h
-  DESTINATION include/arrow/ipc)
+set(ARROW_IPC_LINK_LIBS
+  arrow_io
+  arrow_shared
+)
+
+set(ARROW_IPC_PRIVATE_LINK_LIBS
+  )
+
+set(ARROW_IPC_TEST_LINK_LIBS
+  arrow_ipc
+  ${ARROW_IPC_PRIVATE_LINK_LIBS})
+
+set(ARROW_IPC_SRCS
+  adapter.cc
+  metadata.cc
+  metadata-internal.cc
+)
+
+# TODO(wesm): SHARED and STATIC targets
+add_library(arrow_ipc SHARED
+  ${ARROW_IPC_SRCS}
+)
+target_link_libraries(arrow_ipc
+  LINK_PUBLIC ${ARROW_IPC_LINK_LIBS}
+  LINK_PRIVATE ${ARROW_IPC_PRIVATE_LINK_LIBS})
+
+if(NOT APPLE)
+  # Localize thirdparty symbols using a linker version script. This hides them
+  # from the client application. The OS X linker does not support the
+  # version-script option.
+  set(ARROW_IPC_LINK_FLAGS 
"-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/symbols.map")
+endif()
+
+SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
+  LINKER_LANGUAGE CXX
+  LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")
 
 ADD_ARROW_TEST(ipc-adapter-test)
-ADD_ARROW_TEST(ipc-memory-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
+  ${ARROW_IPC_TEST_LINK_LIBS})
+
 ADD_ARROW_TEST(ipc-metadata-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
+  ${ARROW_IPC_TEST_LINK_LIBS})
 
 # make clean will delete the generated file
 set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE)
@@ -49,3 +83,13 @@ add_custom_command(
 
 add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES})
 add_dependencies(arrow_objlib metadata_fbs)
+
+# Headers: top level
+install(FILES
+  adapter.h
+  metadata.h
+  DESTINATION include/arrow/ipc)
+
+install(TARGETS arrow_ipc
+  LIBRARY DESTINATION lib
+  ARCHIVE DESTINATION lib)

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 40d372b..0e101c8 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -24,9 +24,11 @@
 
 #include "arrow/array.h"
 #include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/memory.h"
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
 #include "arrow/schema.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
@@ -144,10 +146,15 @@ class RowBatchWriter {
     return Status::OK();
   }
 
-  Status Write(MemorySource* dst, int64_t position, int64_t* 
data_header_offset) {
+  Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
     // Write out all the buffers contiguously and compute the total size of the
     // memory payload
     int64_t offset = 0;
+
+    // Get the starting position
+    int64_t position;
+    RETURN_NOT_OK(dst->Tell(&position));
+
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
       int64_t size = 0;
@@ -171,7 +178,7 @@ class RowBatchWriter {
       buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
 
       if (size > 0) {
-        RETURN_NOT_OK(dst->Write(position + offset, buffer->data(), size));
+        RETURN_NOT_OK(dst->Write(buffer->data(), size));
         offset += size;
       }
     }
@@ -180,7 +187,7 @@ class RowBatchWriter {
     // memory, the data header can be converted to a flatbuffer and written out
     //
     // Note: The memory written here is prefixed by the size of the flatbuffer
-    // itself as an int32_t. On reading from a MemorySource, you will have to
+    // itself as an int32_t. On reading from a input, you will have to
     // determine the data header size then request a buffer such that you can
     // construct the flatbuffer data accessor object (see arrow::ipc::Message)
     std::shared_ptr<Buffer> data_header;
@@ -188,8 +195,7 @@ class RowBatchWriter {
         batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header));
 
     // Write the data header at the end
-    RETURN_NOT_OK(
-        dst->Write(position + offset, data_header->data(), 
data_header->size()));
+    RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
 
     *data_header_offset = position + offset;
     return Status::OK();
@@ -199,9 +205,9 @@ class RowBatchWriter {
   Status GetTotalSize(int64_t* size) {
     // emulates the behavior of Write without actually writing
     int64_t data_header_offset;
-    MockMemorySource source(0);
-    RETURN_NOT_OK(Write(&source, 0, &data_header_offset));
-    *size = source.GetExtentBytesWritten();
+    MockOutputStream dst;
+    RETURN_NOT_OK(Write(&dst, &data_header_offset));
+    *size = dst.GetExtentBytesWritten();
     return Status::OK();
   }
 
@@ -214,12 +220,12 @@ class RowBatchWriter {
   int max_recursion_depth_;
 };
 
-Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t 
position,
-    int64_t* header_offset, int max_recursion_depth) {
+Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* 
header_offset,
+    int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
   RowBatchWriter serializer(batch, max_recursion_depth);
   RETURN_NOT_OK(serializer.AssemblePayload());
-  return serializer.Write(dst, position, header_offset);
+  return serializer.Write(dst, header_offset);
 }
 
 Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
@@ -234,11 +240,11 @@ Status GetRowBatchSize(const RowBatch* batch, int64_t* 
size) {
 
 static constexpr int64_t INIT_METADATA_SIZE = 4096;
 
-class RowBatchReader::Impl {
+class RowBatchReader::RowBatchReaderImpl {
  public:
-  Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& 
metadata,
-      int max_recursion_depth)
-      : source_(source), metadata_(metadata), 
max_recursion_depth_(max_recursion_depth) {
+  RowBatchReaderImpl(io::ReadableFileInterface* file,
+      const std::shared_ptr<RecordBatchMessage>& metadata, int 
max_recursion_depth)
+      : file_(file), metadata_(metadata), 
max_recursion_depth_(max_recursion_depth) {
     num_buffers_ = metadata->num_buffers();
     num_flattened_fields_ = metadata->num_fields();
   }
@@ -339,10 +345,11 @@ class RowBatchReader::Impl {
   Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
     BufferMetadata metadata = metadata_->buffer(buffer_index);
     RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
-    return source_->ReadAt(metadata.offset, metadata.length, out);
+    return file_->ReadAt(metadata.offset, metadata.length, out);
   }
 
-  MemorySource* source_;
+ private:
+  io::ReadableFileInterface* file_;
   std::shared_ptr<RecordBatchMessage> metadata_;
 
   int field_index_;
@@ -352,22 +359,22 @@ class RowBatchReader::Impl {
   int num_flattened_fields_;
 };
 
-Status RowBatchReader::Open(
-    MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* 
out) {
-  return Open(source, position, kMaxIpcRecursionDepth, out);
+Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
+    std::shared_ptr<RowBatchReader>* out) {
+  return Open(file, position, kMaxIpcRecursionDepth, out);
 }
 
-Status RowBatchReader::Open(MemorySource* source, int64_t position,
+Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
     int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
   std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata));
+  RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata));
 
   int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
 
-  // We may not need to call source->ReadAt again
+  // We may not need to call ReadAt again
   if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
     // We don't have enough data, read the indicated metadata size.
-    RETURN_NOT_OK(source->ReadAt(position + sizeof(int32_t), metadata_size, 
&metadata));
+    RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, 
&metadata));
   }
 
   // TODO(wesm): buffer slicing here would be better in case ReadAt returns
@@ -383,14 +390,14 @@ Status RowBatchReader::Open(MemorySource* source, int64_t 
position,
   std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
 
   std::shared_ptr<RowBatchReader> result(new RowBatchReader());
-  result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth));
+  result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, 
max_recursion_depth));
   *out = result;
 
   return Status::OK();
 }
 
 // Here the explicit destructor is required for compilers to be aware of
-// the complete information of RowBatchReader::Impl class
+// the complete information of RowBatchReader::RowBatchReaderImpl class
 RowBatchReader::~RowBatchReader() {}
 
 Status RowBatchReader::GetRowBatch(

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 6231af6..215b46f 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -33,9 +33,15 @@ class RowBatch;
 class Schema;
 class Status;
 
+namespace io {
+
+class ReadableFileInterface;
+class OutputStream;
+
+}  // namespace io
+
 namespace ipc {
 
-class MemorySource;
 class RecordBatchMessage;
 
 // ----------------------------------------------------------------------
@@ -43,22 +49,21 @@ class RecordBatchMessage;
 // We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice 
round number
 // TODO(emkornfield) investigate this more
 constexpr int kMaxIpcRecursionDepth = 64;
-// Write the RowBatch (collection of equal-length Arrow arrays) to the memory
-// source at the indicated position
+
+// Write the RowBatch (collection of equal-length Arrow arrays) to the output
+// stream
 //
-// First, each of the memory buffers are written out end-to-end in starting at
-// the indicated position.
+// First, each of the memory buffers are written out end-to-end
 //
 // Then, this function writes the batch metadata as a flatbuffer (see
 // format/Message.fbs -- the RecordBatch message type) like so:
 //
 // <int32: metadata size> <uint8*: metadata>
 //
-// Finally, the memory offset to the start of the metadata / data header is
-// returned in an out-variable
-ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch,
-    int64_t position, int64_t* header_offset,
-    int max_recursion_depth = kMaxIpcRecursionDepth);
+// Finally, the absolute offset (relative to the start of the output stream) to
+// the start of the metadata / data header is returned in an out-variable
+ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch,
+    int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // int64_t GetRowBatchMetadata(const RowBatch* batch);
 
@@ -68,16 +73,16 @@ ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const 
RowBatch* batch,
 ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size);
 
 // ----------------------------------------------------------------------
-// "Read" path; does not copy data if the MemorySource does not
+// "Read" path; does not copy data if the input supports zero copy reads
 
 class ARROW_EXPORT RowBatchReader {
  public:
-  static Status Open(
-      MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* 
out);
-
-  static Status Open(MemorySource* source, int64_t position, int 
max_recursion_depth,
+  static Status Open(io::ReadableFileInterface* file, int64_t position,
       std::shared_ptr<RowBatchReader>* out);
 
+  static Status Open(io::ReadableFileInterface* file, int64_t position,
+      int max_recursion_depth, std::shared_ptr<RowBatchReader>* out);
+
   virtual ~RowBatchReader();
 
   // Reassemble the row batch. A Schema is required to be able to construct the
@@ -86,8 +91,8 @@ class ARROW_EXPORT RowBatchReader {
       const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out);
 
  private:
-  class Impl;
-  std::unique_ptr<Impl> impl_;
+  class RowBatchReaderImpl;
+  std::unique_ptr<RowBatchReaderImpl> impl_;
 };
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc 
b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 6740e0f..ca4d015 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -24,9 +24,11 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
 #include "arrow/ipc/adapter.h"
-#include "arrow/ipc/memory.h"
 #include "arrow/ipc/test-common.h"
+#include "arrow/ipc/util.h"
 
 #include "arrow/test-util.h"
 #include "arrow/types/list.h"
@@ -49,17 +51,18 @@ const auto LIST_LIST_INT32 = 
std::make_shared<ListType>(LIST_INT32);
 typedef Status MakeRowBatch(std::shared_ptr<RowBatch>* out);
 
 class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
-                          public MemoryMapFixture {
+                          public io::MemoryMapFixture {
  public:
   void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { MemoryMapFixture::TearDown(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
 
   Status RoundTripHelper(const RowBatch& batch, int memory_map_size,
       std::shared_ptr<RowBatch>* batch_result) {
     std::string path = "test-write-row-batch";
-    MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+    io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
     int64_t header_location;
-    RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+
+    RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location));
 
     std::shared_ptr<RowBatchReader> reader;
     RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
@@ -69,7 +72,7 @@ class TestWriteRowBatch : public 
::testing::TestWithParam<MakeRowBatch*>,
   }
 
  protected:
-  std::shared_ptr<MemoryMappedSource> mmap_;
+  std::shared_ptr<io::MemoryMappedFile> mmap_;
   MemoryPool* pool_;
 };
 
@@ -276,12 +279,12 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
                             &MakeStringTypesRowBatch, &MakeStruct));
 
 void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
-  MockMemorySource mock_source(1 << 16);
+  ipc::MockOutputStream mock;
   int64_t mock_header_location = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, 
&mock_header_location));
+  ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location));
   ASSERT_OK(GetRowBatchSize(batch.get(), &size));
-  ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
+  ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
 
 TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
@@ -303,10 +306,10 @@ TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
   TestGetRowBatchSize(batch);
 }
 
-class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
+class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
  public:
   void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { MemoryMapFixture::TearDown(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
 
   Status WriteToMmap(int recursion_level, bool override_level,
       int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = 
nullptr) {
@@ -329,19 +332,19 @@ class RecursionLimits : public ::testing::Test, public 
MemoryMapFixture {
 
     std::string path = "test-write-past-max-recursion";
     const int memory_map_size = 1 << 16;
-    MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+    io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
     int64_t header_location;
     int64_t* header_out_param = header_out == nullptr ? &header_location : 
header_out;
     if (override_level) {
       return WriteRowBatch(
-          mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1);
+          mmap_.get(), batch.get(), header_out_param, recursion_level + 1);
     } else {
-      return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param);
+      return WriteRowBatch(mmap_.get(), batch.get(), header_out_param);
     }
   }
 
  protected:
-  std::shared_ptr<MemoryMappedSource> mmap_;
+  std::shared_ptr<io::MemoryMappedFile> mmap_;
   MemoryPool* pool_;
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc 
b/cpp/src/arrow/ipc/ipc-memory-test.cc
deleted file mode 100644
index a2dbd35..0000000
--- a/cpp/src/arrow/ipc/ipc-memory-test.cc
+++ /dev/null
@@ -1,127 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/ipc/memory.h"
-#include "arrow/ipc/test-common.h"
-
-namespace arrow {
-namespace ipc {
-
-class TestMemoryMappedSource : public ::testing::Test, public MemoryMapFixture 
{
- public:
-  void TearDown() { MemoryMapFixture::TearDown(); }
-};
-
-TEST_F(TestMemoryMappedSource, InvalidUsages) {}
-
-TEST_F(TestMemoryMappedSource, WriteRead) {
-  const int64_t buffer_size = 1024;
-  std::vector<uint8_t> buffer(buffer_size);
-
-  test::random_bytes(1024, 0, buffer.data());
-
-  const int reps = 5;
-
-  std::string path = "ipc-write-read-test";
-  CreateFile(path, reps * buffer_size);
-
-  std::shared_ptr<MemoryMappedSource> result;
-  ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &result));
-
-  int64_t position = 0;
-
-  std::shared_ptr<Buffer> out_buffer;
-  for (int i = 0; i < reps; ++i) {
-    ASSERT_OK(result->Write(position, buffer.data(), buffer_size));
-    ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
-
-    ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
-
-    position += buffer_size;
-  }
-}
-
-TEST_F(TestMemoryMappedSource, ReadOnly) {
-  const int64_t buffer_size = 1024;
-  std::vector<uint8_t> buffer(buffer_size);
-
-  test::random_bytes(1024, 0, buffer.data());
-
-  const int reps = 5;
-
-  std::string path = "ipc-read-only-test";
-  CreateFile(path, reps * buffer_size);
-
-  std::shared_ptr<MemoryMappedSource> rwmmap;
-  ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap));
-
-  int64_t position = 0;
-  for (int i = 0; i < reps; ++i) {
-    ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size));
-
-    position += buffer_size;
-  }
-  rwmmap->Close();
-
-  std::shared_ptr<MemoryMappedSource> rommap;
-  ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));
-
-  position = 0;
-  std::shared_ptr<Buffer> out_buffer;
-  for (int i = 0; i < reps; ++i) {
-    ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
-
-    ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
-    position += buffer_size;
-  }
-  rommap->Close();
-}
-
-TEST_F(TestMemoryMappedSource, InvalidMode) {
-  const int64_t buffer_size = 1024;
-  std::vector<uint8_t> buffer(buffer_size);
-
-  test::random_bytes(1024, 0, buffer.data());
-
-  std::string path = "ipc-invalid-mode-test";
-  CreateFile(path, buffer_size);
-
-  std::shared_ptr<MemoryMappedSource> rommap;
-  ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));
-
-  ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size));
-}
-
-TEST_F(TestMemoryMappedSource, InvalidFile) {
-  std::string non_existent_path = "invalid-file-name-asfd";
-
-  std::shared_ptr<MemoryMappedSource> result;
-  ASSERT_RAISES(IOError,
-      MemoryMappedSource::Open(non_existent_path, MemorySource::READ_ONLY, 
&result));
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
deleted file mode 100644
index a6c56d6..0000000
--- a/cpp/src/arrow/ipc/memory.cc
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/memory.h"
-
-#include <sys/mman.h>  // For memory-mapping
-
-#include <algorithm>
-#include <cerrno>
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <sstream>
-#include <string>
-
-#include "arrow/util/buffer.h"
-#include "arrow/util/status.h"
-
-namespace arrow {
-namespace ipc {
-
-MemorySource::MemorySource(AccessMode access_mode) : access_mode_(access_mode) 
{}
-
-MemorySource::~MemorySource() {}
-
-// Implement MemoryMappedSource
-
-class MemoryMappedSource::Impl {
- public:
-  Impl() : file_(nullptr), is_open_(false), is_writable_(false), 
data_(nullptr) {}
-
-  ~Impl() {
-    if (is_open_) {
-      munmap(data_, size_);
-      fclose(file_);
-    }
-  }
-
-  Status Open(const std::string& path, MemorySource::AccessMode mode) {
-    if (is_open_) { return Status::IOError("A file is already open"); }
-
-    int prot_flags = PROT_READ;
-
-    if (mode == MemorySource::READ_WRITE) {
-      file_ = fopen(path.c_str(), "r+b");
-      prot_flags |= PROT_WRITE;
-      is_writable_ = true;
-    } else {
-      file_ = fopen(path.c_str(), "rb");
-    }
-    if (file_ == nullptr) {
-      std::stringstream ss;
-      ss << "Unable to open file, errno: " << errno;
-      return Status::IOError(ss.str());
-    }
-
-    fseek(file_, 0L, SEEK_END);
-    if (ferror(file_)) { return Status::IOError("Unable to seek to end of 
file"); }
-    size_ = ftell(file_);
-
-    fseek(file_, 0L, SEEK_SET);
-    is_open_ = true;
-
-    void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 
0);
-    if (result == MAP_FAILED) {
-      std::stringstream ss;
-      ss << "Memory mapping file failed, errno: " << errno;
-      return Status::IOError(ss.str());
-    }
-    data_ = reinterpret_cast<uint8_t*>(result);
-
-    return Status::OK();
-  }
-
-  int64_t size() const { return size_; }
-
-  uint8_t* data() { return data_; }
-
-  bool writable() { return is_writable_; }
-
-  bool opened() { return is_open_; }
-
- private:
-  FILE* file_;
-  int64_t size_;
-  bool is_open_;
-  bool is_writable_;
-
-  // The memory map
-  uint8_t* data_;
-};
-
-MemoryMappedSource::MemoryMappedSource(AccessMode access_mode)
-    : MemorySource(access_mode) {}
-
-Status MemoryMappedSource::Open(const std::string& path, AccessMode 
access_mode,
-    std::shared_ptr<MemoryMappedSource>* out) {
-  std::shared_ptr<MemoryMappedSource> result(new 
MemoryMappedSource(access_mode));
-
-  result->impl_.reset(new Impl());
-  RETURN_NOT_OK(result->impl_->Open(path, access_mode));
-
-  *out = result;
-  return Status::OK();
-}
-
-int64_t MemoryMappedSource::Size() const {
-  return impl_->size();
-}
-
-Status MemoryMappedSource::Close() {
-  // munmap handled in ::Impl dtor
-  return Status::OK();
-}
-
-Status MemoryMappedSource::ReadAt(
-    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  if (position < 0 || position >= impl_->size()) {
-    return Status::Invalid("position is out of bounds");
-  }
-
-  nbytes = std::min(nbytes, impl_->size() - position);
-  *out = std::make_shared<Buffer>(impl_->data() + position, nbytes);
-  return Status::OK();
-}
-
-Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, 
int64_t nbytes) {
-  if (!impl_->opened() || !impl_->writable()) {
-    return Status::IOError("Unable to write");
-  }
-  if (position < 0 || position >= impl_->size()) {
-    return Status::Invalid("position is out of bounds");
-  }
-
-  // TODO(wesm): verify we are not writing past the end of the buffer
-  uint8_t* dst = impl_->data() + position;
-  memcpy(dst, data, nbytes);
-
-  return Status::OK();
-}
-
-MockMemorySource::MockMemorySource(int64_t size)
-    : size_(size), extent_bytes_written_(0) {}
-
-Status MockMemorySource::Close() {
-  return Status::OK();
-}
-
-Status MockMemorySource::ReadAt(
-    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  return Status::OK();
-}
-
-Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t 
nbytes) {
-  extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes);
-  return Status::OK();
-}
-
-int64_t MockMemorySource::Size() const {
-  return size_;
-}
-
-int64_t MockMemorySource::GetExtentBytesWritten() const {
-  return extent_bytes_written_;
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h
deleted file mode 100644
index 377401d..0000000
--- a/cpp/src/arrow/ipc/memory.h
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Public API for different interprocess memory sharing mechanisms
-
-#ifndef ARROW_IPC_MEMORY_H
-#define ARROW_IPC_MEMORY_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-#include "arrow/util/macros.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Buffer;
-class MutableBuffer;
-class Status;
-
-namespace ipc {
-
-// Abstract output stream
-class OutputStream {
- public:
-  virtual ~OutputStream() {}
-  // Close the output stream
-  virtual Status Close() = 0;
-
-  // The current position in the output stream
-  virtual int64_t Tell() const = 0;
-
-  // Write bytes to the stream
-  virtual Status Write(const uint8_t* data, int64_t length) = 0;
-};
-
-// An output stream that writes to a MutableBuffer, such as one obtained from a
-// memory map
-class BufferOutputStream : public OutputStream {
- public:
-  explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
-      : buffer_(buffer) {}
-
-  // Implement the OutputStream interface
-  Status Close() override;
-  int64_t Tell() const override;
-  Status Write(const uint8_t* data, int64_t length) override;
-
-  // Returns the number of bytes remaining in the buffer
-  int64_t bytes_remaining() const;
-
- private:
-  std::shared_ptr<MutableBuffer> buffer_;
-  int64_t capacity_;
-  int64_t position_;
-};
-
-class ARROW_EXPORT MemorySource {
- public:
-  // Indicates the access permissions of the memory source
-  enum AccessMode { READ_ONLY, READ_WRITE };
-
-  virtual ~MemorySource();
-
-  // Retrieve a buffer of memory from the source of the indicates size and at
-  // the indicated location
-  // @returns: arrow::Status indicating success / failure. The buffer is set
-  // into the *out argument
-  virtual Status ReadAt(
-      int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
-
-  virtual Status Close() = 0;
-
-  virtual Status Write(int64_t position, const uint8_t* data, int64_t nbytes) 
= 0;
-
-  // @return: the size in bytes of the memory source
-  virtual int64_t Size() const = 0;
-
- protected:
-  explicit MemorySource(AccessMode access_mode = AccessMode::READ_WRITE);
-
-  AccessMode access_mode_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(MemorySource);
-};
-
-// A memory source that uses memory-mapped files for memory interactions
-class ARROW_EXPORT MemoryMappedSource : public MemorySource {
- public:
-  static Status Open(const std::string& path, AccessMode access_mode,
-      std::shared_ptr<MemoryMappedSource>* out);
-
-  Status Close() override;
-
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
-
-  Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
-
-  // @return: the size in bytes of the memory source
-  int64_t Size() const override;
-
- private:
-  explicit MemoryMappedSource(AccessMode access_mode);
-  // Hide the internal details of this class for now
-  class Impl;
-  std::unique_ptr<Impl> impl_;
-};
-
-// A MemorySource that tracks the size of allocations from a memory source
-class MockMemorySource : public MemorySource {
- public:
-  explicit MockMemorySource(int64_t size);
-
-  Status Close() override;
-
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
-
-  Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
-
-  int64_t Size() const override;
-
-  // @return: the smallest number of bytes containing the modified region of 
the
-  // MockMemorySource
-  int64_t GetExtentBytesWritten() const;
-
- private:
-  int64_t size_;
-  int64_t extent_bytes_written_;
-};
-
-}  // namespace ipc
-}  // namespace arrow
-
-#endif  // ARROW_IPC_MEMORY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc 
b/cpp/src/arrow/ipc/metadata-internal.cc
index 8cc902c..05e9c7a 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -220,9 +220,8 @@ static Status FieldToFlatbuffer(
   auto fb_children = fbb.CreateVector(children);
 
   // TODO: produce the list of VectorTypes
-  *offset = flatbuf::CreateField(
-      fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary,
-      fb_children);
+  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, 
type_data,
+      field->dictionary, fb_children);
 
   return Status::OK();
 }
@@ -295,8 +294,8 @@ Status WriteDataHeader(int32_t length, int64_t body_length,
 }
 
 Status MessageBuilder::Finish() {
-  auto message = flatbuf::CreateMessage(fbb_, kMetadataVersion,
-      header_type_, header_, body_length_);
+  auto message =
+      flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, 
body_length_);
   fbb_.Finish(message);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h 
b/cpp/src/arrow/ipc/metadata-internal.h
index db9a83f..d38df84 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -38,7 +38,7 @@ class Status;
 namespace ipc {
 
 static constexpr flatbuf::MetadataVersion kMetadataVersion =
-  flatbuf::MetadataVersion_V1_SNAPSHOT;
+    flatbuf::MetadataVersion_V1_SNAPSHOT;
 
 Status FieldFromFlatbuffer(const flatbuf::Field* field, 
std::shared_ptr<Field>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 838a4a6..d5ec533 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -23,6 +23,8 @@
 #include <cstdint>
 #include <memory>
 
+#include "arrow/util/visibility.h"
+
 namespace arrow {
 
 class Buffer;
@@ -36,6 +38,7 @@ namespace ipc {
 // Message read/write APIs
 
 // Serialize arrow::Schema as a Flatbuffer
+ARROW_EXPORT
 Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
 
 //----------------------------------------------------------------------
@@ -47,7 +50,7 @@ Status WriteSchema(const Schema* schema, 
std::shared_ptr<Buffer>* out);
 class Message;
 
 // Container for serialized Schema metadata contained in an IPC message
-class SchemaMessage {
+class ARROW_EXPORT SchemaMessage {
  public:
   // Accepts an opaque flatbuffer pointer
   SchemaMessage(const std::shared_ptr<Message>& message, const void* schema);
@@ -82,7 +85,7 @@ struct BufferMetadata {
 };
 
 // Container for serialized record batch metadata contained in an IPC message
-class RecordBatchMessage {
+class ARROW_EXPORT RecordBatchMessage {
  public:
   // Accepts an opaque flatbuffer pointer
   RecordBatchMessage(const std::shared_ptr<Message>& message, const void* 
batch_meta);
@@ -102,13 +105,13 @@ class RecordBatchMessage {
   std::unique_ptr<Impl> impl_;
 };
 
-class DictionaryBatchMessage {
+class ARROW_EXPORT DictionaryBatchMessage {
  public:
   int64_t id() const;
   std::unique_ptr<RecordBatchMessage> data() const;
 };
 
-class Message : public std::enable_shared_from_this<Message> {
+class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> {
  public:
   enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };
 

Reply via email to