ARROW-1104: Integrate in-memory object store into arrow

This supersedes https://github.com/apache/arrow/pull/467

This is ready for review. Next steps are
- Integration with the arrow CI
- Write docs on how to use the object store

There is one remaining compilation error (it doesn't find Python.h for one of 
the Travis configurations, if anybody has an idea on what is going on, let me 
know).

Author: Philipp Moritz <pcmor...@gmail.com>
Author: Robert Nishihara <robertnishih...@gmail.com>

Closes #742 from pcmoritz/plasma-store-2 and squashes the following commits:

c100a453 [Philipp Moritz] fixes
d67160c5 [Philipp Moritz] build dlmalloc with -O3
16d1f716 [Philipp Moritz] fix test hanging
0f321e16 [Philipp Moritz] try to fix tests
80f9df40 [Philipp Moritz] make format
4c474d71 [Philipp Moritz] run plasma_store from the right directory
85aa1710 [Philipp Moritz] fix mac tests
61d421b5 [Philipp Moritz] fix formatting
4497e337 [Philipp Moritz] fix tests
00f17f24 [Philipp Moritz] fix licenses
81437920 [Philipp Moritz] fix linting
5370ae06 [Philipp Moritz] fix plasma protocol
a137e783 [Philipp Moritz] more fixes
b36c6aaa [Philipp Moritz] fix fling.cc
214c426c [Philipp Moritz] fix eviction policy
e7badc48 [Philipp Moritz] fix python extension
6432d3fa [Philipp Moritz] fix formatting
b21f0814 [Philipp Moritz] fix remaining comments about client
27f9c9e8 [Philipp Moritz] fix formatting
7b08fd2a [Philipp Moritz] replace ObjectID pass by value with pass by const 
reference and fix const correctness
ca80e9a6 [Philipp Moritz] remove plain pointer in plasma client, part II
627b7c75 [Philipp Moritz] fix python extension name
30bd68b7 [Philipp Moritz] remove plain pointer in plasma client, part I
77d98227 [Philipp Moritz] put all the object code into a common library
0fdd4cd5 [Philipp Moritz] link libarrow.a and remove hardcoded optimization 
flags
8daea699 [Philipp Moritz] fix includes according to google styleguide
65ac7433 [Philipp Moritz] remove offending c++ flag from c flags
7003a4a4 [Philipp Moritz] fix valgrind test by setting working directory
217ff3d8 [Philipp Moritz] add valgrind heuristic
9c703c20 [Philipp Moritz] integrate client tests
9e5ae0e1 [Philipp Moritz] port serialization tests to gtest
0b8593db [Robert Nishihara] Port change from Ray. Change listen backlog size 
from 5 to 128.
b9a5a06e [Philipp Moritz] fix includes
ed680f97 [Philipp Moritz] reformat the code
f40f85bd [Philipp Moritz] add clang-format exceptions
d6e60d26 [Philipp Moritz] do not compile plasma on windows
f936adb7 [Philipp Moritz] build plasma python client only if python is available
e11b0e86 [Philipp Moritz] fix pthread
74ecb199 [Philipp Moritz] don't link against Python libraries
b1e0335a [Philipp Moritz] fix linting
7f7e7e78 [Philipp Moritz] more linting
79ea0ca7 [Philipp Moritz] fix clang-tidy
99420e8f [Philipp Moritz] add rat exceptions
6cee1e25 [Philipp Moritz] fix
c93034fb [Philipp Moritz] add Apache 2.0 headers
63729130 [Philipp Moritz] fix malloc?
99537c94 [Philipp Moritz] fix compiler warnings
cb3f3a38 [Philipp Moritz] compile C files with CMAKE_C_FLAGS
e649c2af [Philipp Moritz] fix compilation
04c2edb3 [Philipp Moritz] add missing file
51ab9630 [Philipp Moritz] fix compiler warnings
9ef7f412 [Philipp Moritz] make the plasma store compile
e9f9bb4a [Philipp Moritz] Initial commit of the plasma store. Contributors: 
Philipp Moritz, Robert Nishihara, Richard Shin, Stephanie Wang, Alexey Tumanov, 
Ion Stoica @ RISElab, UC Berkeley (2017) [from 
https://github.com/ray-project/ray/commit/b94b4a35e04d8d2c0af4420518a4e9a94c1c9b9f]


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5e343098
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5e343098
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5e343098

Branch: refs/heads/master
Commit: 5e343098187cb822017f359748e28c53ece70e75
Parents: ef579ca
Author: Philipp Moritz <pcmor...@gmail.com>
Authored: Thu Jun 22 09:35:34 2017 -0400
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Thu Jun 22 09:35:34 2017 -0400

----------------------------------------------------------------------
 LICENSE.txt                                |   96 +
 ci/travis_before_script_cpp.sh             |    5 +-
 ci/travis_script_python.sh                 |    1 +
 cpp/CMakeLists.txt                         |   20 +-
 cpp/src/arrow/status.h                     |   23 +
 cpp/src/arrow/util/logging.h               |    8 +-
 cpp/src/plasma/CMakeLists.txt              |  112 +
 cpp/src/plasma/client.cc                   |  557 ++
 cpp/src/plasma/client.h                    |  343 ++
 cpp/src/plasma/common.cc                   |   83 +
 cpp/src/plasma/common.h                    |   63 +
 cpp/src/plasma/events.cc                   |   81 +
 cpp/src/plasma/events.h                    |   99 +
 cpp/src/plasma/eviction_policy.cc          |  107 +
 cpp/src/plasma/eviction_policy.h           |  134 +
 cpp/src/plasma/extension.cc                |  456 ++
 cpp/src/plasma/extension.h                 |   50 +
 cpp/src/plasma/fling.cc                    |   90 +
 cpp/src/plasma/fling.h                     |   52 +
 cpp/src/plasma/format/common.fbs           |   34 +
 cpp/src/plasma/format/plasma.fbs           |  291 ++
 cpp/src/plasma/io.cc                       |  212 +
 cpp/src/plasma/io.h                        |   55 +
 cpp/src/plasma/malloc.cc                   |  178 +
 cpp/src/plasma/malloc.h                    |   26 +
 cpp/src/plasma/plasma.cc                   |   64 +
 cpp/src/plasma/plasma.h                    |  191 +
 cpp/src/plasma/protocol.cc                 |  502 ++
 cpp/src/plasma/protocol.h                  |  170 +
 cpp/src/plasma/store.cc                    |  681 +++
 cpp/src/plasma/store.h                     |  169 +
 cpp/src/plasma/test/client_tests.cc        |  132 +
 cpp/src/plasma/test/run_tests.sh           |   61 +
 cpp/src/plasma/test/run_valgrind.sh        |   27 +
 cpp/src/plasma/test/serialization_tests.cc |  388 ++
 cpp/src/plasma/thirdparty/ae/ae.c          |  465 ++
 cpp/src/plasma/thirdparty/ae/ae.h          |  123 +
 cpp/src/plasma/thirdparty/ae/ae_epoll.c    |  135 +
 cpp/src/plasma/thirdparty/ae/ae_evport.c   |  320 ++
 cpp/src/plasma/thirdparty/ae/ae_kqueue.c   |  138 +
 cpp/src/plasma/thirdparty/ae/ae_select.c   |  106 +
 cpp/src/plasma/thirdparty/ae/config.h      |   54 +
 cpp/src/plasma/thirdparty/ae/zmalloc.h     |   45 +
 cpp/src/plasma/thirdparty/dlmalloc.c       | 6281 +++++++++++++++++++++++
 cpp/src/plasma/thirdparty/xxhash.cc        |  889 ++++
 cpp/src/plasma/thirdparty/xxhash.h         |  293 ++
 dev/release/run-rat.sh                     |   11 +
 47 files changed, 14411 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index d645695..7000733 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -200,3 +200,99 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+--------------------------------------------------------------------------------
+
+src/plasma/fling.cc and src/plasma/fling.h: Apache 2.0
+
+Copyright 2013 Sharvil Nanavati
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+--------------------------------------------------------------------------------
+
+src/plasma/thirdparty/ae: Modified / 3-Clause BSD
+
+Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+   this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+ * Neither the name of Redis nor the names of its contributors may be used
+   to endorse or promote products derived from this software without
+   specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
+--------------------------------------------------------------------------------
+
+src/plasma/thirdparty/dlmalloc.c: CC0
+
+This is a version (aka dlmalloc) of malloc/free/realloc written by
+Doug Lea and released to the public domain, as explained at
+http://creativecommons.org/publicdomain/zero/1.0/ Send questions,
+comments, complaints, performance data, etc to d...@cs.oswego.edu
+
+--------------------------------------------------------------------------------
+
+src/plasma/thirdparty/xxhash: BSD 2-Clause License
+
+xxHash - Fast Hash algorithm
+Copyright (C) 2012-2016, Yann Collet
+
+BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+* Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+* Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+You can contact the author at :
+- xxHash homepage: http://www.xxhash.com
+- xxHash source repository : https://github.com/Cyan4973/xxHash
+
+--------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 9908735..a38a0dc 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -26,12 +26,11 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
 if [ $only_library_mode == "no" ]; then
   # C++ toolchain
   export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain
-  export FLATBUFFERS_HOME=$CPP_TOOLCHAIN
   export RAPIDJSON_HOME=$CPP_TOOLCHAIN
 
   # Set up C++ toolchain from conda-forge packages for faster builds
   source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
-  conda create -y -q -p $CPP_TOOLCHAIN python=2.7 flatbuffers rapidjson
+  conda create -y -q -p $CPP_TOOLCHAIN python=2.7 rapidjson
 fi
 
 if [ $TRAVIS_OS_NAME == "osx" ]; then
@@ -73,7 +72,7 @@ else
           $ARROW_CPP_DIR
 fi
 
-make -j4
+make VERBOSE=1 -j4
 make install
 
 popd

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 6cc760f..b82653d 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -72,6 +72,7 @@ function build_arrow_libraries() {
 
   cmake -DARROW_BUILD_TESTS=off \
         -DARROW_PYTHON=on \
+        -DPLASMA_PYTHON=on \
         -DCMAKE_INSTALL_PREFIX=$2 \
         $CPP_DIR
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 962891a..0897e99 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -187,6 +187,8 @@ include(san-config)
 
 # For any C code, use the same flags.
 set(CMAKE_C_FLAGS "${CMAKE_CXX_FLAGS}")
+# Remove --std=c++11 to avoid errors from C compilers
+string(REPLACE "-std=c++11" "" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
 
 # Code coverage
 if ("${ARROW_GENERATE_COVERAGE}")
@@ -362,7 +364,7 @@ function(ADD_ARROW_TEST REL_TEST_NAME)
       APPEND_STRING PROPERTY
       COMPILE_FLAGS " -DARROW_VALGRIND")
     add_test(${TEST_NAME}
-      valgrind --tool=memcheck --leak-check=full --error-exitcode=1 
${TEST_PATH})
+      bash -c "cd ${EXECUTABLE_OUTPUT_PATH}; valgrind --tool=memcheck 
--leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 
${TEST_PATH}")
   elseif(MSVC)
     add_test(${TEST_NAME} ${TEST_PATH})
   else()
@@ -707,6 +709,7 @@ if (ARROW_IPC)
     ExternalProject_Add(flatbuffers_ep
       URL 
"https://github.com/google/flatbuffers/archive/v${FLATBUFFERS_VERSION}.tar.gz";
       CMAKE_ARGS
+      "-DCMAKE_CXX_FLAGS=-fPIC"
       "-DCMAKE_INSTALL_PREFIX:PATH=${FLATBUFFERS_PREFIX}"
       "-DFLATBUFFERS_BUILD_TESTS=OFF")
 
@@ -871,7 +874,12 @@ if (UNIX)
 
   FOREACH(item ${LINT_FILES})
     IF(NOT ((item MATCHES "_generated.h") OR
-            (item MATCHES "pyarrow_api.h")))
+            (item MATCHES "pyarrow_api.h") OR
+            (item MATCHES "xxhash.h") OR
+            (item MATCHES "xxhash.cc") OR
+            (item MATCHES "config.h") OR
+            (item MATCHES "zmalloc.h") OR
+            (item MATCHES "ae.h")))
       LIST(APPEND FILTERED_LINT_FILES ${item})
     ENDIF()
   ENDFOREACH(item ${LINT_FILES})
@@ -899,7 +907,10 @@ if (${CLANG_FORMAT_FOUND})
     sed -e '/windows_compatibility.h/g' |
     sed -e '/pyarrow_api.h/g' |
     sed -e '/config.h/g' |   # python/config.h
-    sed -e '/platform.h/g'`  # python/platform.h
+    sed -e '/platform.h/g' |  # python/platform.h
+    sed -e '/ae.h/g' |
+    sed -e '/xxhash.cc/g' |
+    sed -e '/xxhash.h/g'`
     )
 
   # runs clang format and exits with a non-zero exit code if any files need to 
be reformatted
@@ -1002,6 +1013,9 @@ if(FLATBUFFERS_VENDORED)
   set(ARROW_DEPENDENCIES ${ARROW_DEPENDENCIES} flatbuffers_ep)
 endif()
 
+if(NOT WIN32)
+  add_subdirectory(src/plasma)
+endif()
 add_subdirectory(src/arrow)
 add_subdirectory(src/arrow/io)
 if (ARROW_IPC)

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/arrow/status.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 1688b96..7e7f67c 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -83,6 +83,9 @@ enum class StatusCode : char {
   IOError = 5,
   UnknownError = 9,
   NotImplemented = 10,
+  PlasmaObjectExists = 20,
+  PlasmaObjectNonexistent = 21,
+  PlasmaStoreFull = 22
 };
 
 class ARROW_EXPORT Status {
@@ -129,6 +132,18 @@ class ARROW_EXPORT Status {
     return Status(StatusCode::IOError, msg, -1);
   }
 
+  static Status PlasmaObjectExists(const std::string& msg) {
+    return Status(StatusCode::PlasmaObjectExists, msg, -1);
+  }
+
+  static Status PlasmaObjectNonexistent(const std::string& msg) {
+    return Status(StatusCode::PlasmaObjectNonexistent, msg, -1);
+  }
+
+  static Status PlasmaStoreFull(const std::string& msg) {
+    return Status(StatusCode::PlasmaStoreFull, msg, -1);
+  }
+
   // Returns true iff the status indicates success.
   bool ok() const { return (state_ == NULL); }
 
@@ -139,6 +154,14 @@ class ARROW_EXPORT Status {
   bool IsTypeError() const { return code() == StatusCode::TypeError; }
   bool IsUnknownError() const { return code() == StatusCode::UnknownError; }
   bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; 
}
+  // An object with this object ID already exists in the plasma store.
+  bool IsPlasmaObjectExists() const { return code() == 
StatusCode::PlasmaObjectExists; }
+  // An object was requested that doesn't exist in the plasma store.
+  bool IsPlasmaObjectNonexistent() const {
+    return code() == StatusCode::PlasmaObjectNonexistent;
+  }
+  // An object is too large to fit into the plasma store.
+  bool IsPlasmaStoreFull() const { return code() == 
StatusCode::PlasmaStoreFull; }
 
   // Return a string representation of this status suitable for printing.
   // Returns the string "OK" for success.

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 697d47c..49f1699 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -30,6 +30,7 @@ namespace arrow {
 
 // Log levels. LOG ignores them, so their values are abitrary.
 
+#define ARROW_DEBUG (-1)
 #define ARROW_INFO 0
 #define ARROW_WARNING 1
 #define ARROW_ERROR 2
@@ -38,10 +39,9 @@ namespace arrow {
 #define ARROW_LOG_INTERNAL(level) ::arrow::internal::CerrLog(level)
 #define ARROW_LOG(level) ARROW_LOG_INTERNAL(ARROW_##level)
 
-#define ARROW_CHECK(condition)                           \
-  (condition) ? 0                                        \
-              : ::arrow::internal::FatalLog(ARROW_FATAL) \
-                    << __FILE__ << __LINE__ << " Check failed: " #condition " "
+#define ARROW_CHECK(condition)                               \
+  (condition) ? 0 : ::arrow::internal::FatalLog(ARROW_FATAL) \
+                        << __FILE__ << __LINE__ << " Check failed: " 
#condition " "
 
 #ifdef NDEBUG
 #define ARROW_DFATAL ARROW_WARNING

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
new file mode 100644
index 0000000..992c33e
--- /dev/null
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8)
+
+project(plasma)
+
+find_package(PythonLibsNew REQUIRED)
+find_package(Threads)
+
+option(PLASMA_PYTHON
+  "Build the Plasma Python extensions"
+  OFF)
+
+if(APPLE)
+  SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
+endif(APPLE)
+
+include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
+include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" 
"${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 
-D_POSIX_C_SOURCE=200809L")
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
+
+# Compile flatbuffers
+
+set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" 
"${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
+set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
+
+set(PLASMA_FBS_OUTPUT_FILES
+  "${OUTPUT_DIR}/common_generated.h"
+  "${OUTPUT_DIR}/plasma_generated.h")
+
+add_custom_command(
+  OUTPUT ${PLASMA_FBS_OUTPUT_FILES}
+  # The --gen-object-api flag generates a C++ class MessageT for each
+  # flatbuffers message Message, which can be used to store deserialized
+  # messages in data structures. This is currently used for ObjectInfo for
+  # example.
+  COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} 
--gen-object-api
+  DEPENDS ${PLASMA_FBS_SRC}
+  COMMENT "Running flatc compiler on ${PLASMA_FBS_SRC}"
+  VERBATIM)
+
+add_custom_target(gen_plasma_fbs DEPENDS ${PLASMA_FBS_OUTPUT_FILES})
+
+add_dependencies(gen_plasma_fbs flatbuffers_ep)
+
+if(UNIX AND NOT APPLE)
+  link_libraries(rt)
+endif()
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
+
+set_source_files_properties(thirdparty/dlmalloc.c PROPERTIES COMPILE_FLAGS 
-Wno-all)
+set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS 
-Wno-strict-aliasing)
+
+set(PLASMA_SRCS
+  client.cc
+  common.cc
+  eviction_policy.cc
+  events.cc
+  fling.cc
+  io.cc
+  malloc.cc
+  plasma.cc
+  protocol.cc
+  thirdparty/ae/ae.c
+  thirdparty/xxhash.cc)
+
+ADD_ARROW_LIB(plasma
+  SOURCES ${PLASMA_SRCS}
+  DEPENDENCIES gen_plasma_fbs
+  SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} 
arrow_static
+  STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} 
arrow_static)
+
+# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
+# malloc.cc; we set it here regardless of whether we do a debug or release 
build.
+set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS 
"-Wno-error=conversion -O3")
+
+add_executable(plasma_store store.cc)
+target_link_libraries(plasma_store plasma_static)
+
+ADD_ARROW_TEST(test/serialization_tests)
+ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
+ADD_ARROW_TEST(test/client_tests)
+ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
+
+if(PLASMA_PYTHON)
+  add_library(plasma_extension SHARED extension.cc)
+
+  if(APPLE)
+    target_link_libraries(plasma_extension plasma_static "-undefined 
dynamic_lookup")
+  else(APPLE)
+    target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive 
${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
+  endif(APPLE)
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
new file mode 100644
index 0000000..dcb78e7
--- /dev/null
+++ b/cpp/src/plasma/client.cc
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA CLIENT: Client library for using the plasma store and manager
+
+#include "plasma/client.h"
+
+#ifdef _WIN32
+#include <Win32_Interop/win32_types.h>
+#endif
+
+#include <assert.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <thread>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+#define XXH_STATIC_LINKING_ONLY
+#include "thirdparty/xxhash.h"
+
+#define XXH64_DEFAULT_SEED 0
+
+// Number of threads used for memcopy and hash computations.
+constexpr int64_t kThreadPoolSize = 8;
+constexpr int64_t kBytesInMB = 1 << 20;
+static std::vector<std::thread> threadpool_(kThreadPoolSize);
+
+// If the file descriptor fd has been mmapped in this client process before,
+// return the pointer that was returned by mmap, otherwise mmap it and store 
the
+// pointer in a hash table.
+uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t 
map_size) {
+  auto entry = mmap_table_.find(store_fd_val);
+  if (entry != mmap_table_.end()) {
+    close(fd);
+    return entry->second.pointer;
+  } else {
+    uint8_t* result = reinterpret_cast<uint8_t*>(
+        mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
+    // TODO(pcm): Don't fail here, instead return a Status.
+    if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; }
+    close(fd);
+    ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
+    entry.pointer = result;
+    entry.length = map_size;
+    entry.count = 0;
+    return result;
+  }
+}
+
+// Get a pointer to a file that we know has been memory mapped in this client
+// process before.
+uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
+  auto entry = mmap_table_.find(store_fd_val);
+  ARROW_CHECK(entry != mmap_table_.end());
+  return entry->second.pointer;
+}
+
+void PlasmaClient::increment_object_count(
+    const ObjectID& object_id, PlasmaObject* object, bool is_sealed) {
+  // Increment the count of the object to track the fact that it is being used.
+  // The corresponding decrement should happen in PlasmaClient::Release.
+  auto elem = objects_in_use_.find(object_id);
+  ObjectInUseEntry* object_entry;
+  if (elem == objects_in_use_.end()) {
+    // Add this object ID to the hash table of object IDs in use. The
+    // corresponding call to free happens in PlasmaClient::Release.
+    objects_in_use_[object_id] =
+        std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
+    objects_in_use_[object_id]->object = *object;
+    objects_in_use_[object_id]->count = 0;
+    objects_in_use_[object_id]->is_sealed = is_sealed;
+    object_entry = objects_in_use_[object_id].get();
+    // Increment the count of the number of objects in the memory-mapped file
+    // that are being used. The corresponding decrement should happen in
+    // PlasmaClient::Release.
+    auto entry = mmap_table_.find(object->handle.store_fd);
+    ARROW_CHECK(entry != mmap_table_.end());
+    ARROW_CHECK(entry->second.count >= 0);
+    // Update the in_use_object_bytes_.
+    in_use_object_bytes_ +=
+        (object_entry->object.data_size + object_entry->object.metadata_size);
+    entry->second.count += 1;
+  } else {
+    object_entry = elem->second.get();
+    ARROW_CHECK(object_entry->count > 0);
+  }
+  // Increment the count of the number of instances of this object that are
+  // being used by this client. The corresponding decrement should happen in
+  // PlasmaClient::Release.
+  object_entry->count += 1;
+}
+
+Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
+    uint8_t* metadata, int64_t metadata_size, uint8_t** data) {
+  ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with 
size "
+                   << data_size << " and metadata size " << metadata_size;
+  RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, 
metadata_size));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, 
&buffer));
+  ObjectID id;
+  PlasmaObject object;
+  RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
+  // If the CreateReply included an error, then the store will not send a file
+  // descriptor.
+  int fd = recv_fd(store_conn_);
+  ARROW_CHECK(fd >= 0) << "recv not successful";
+  ARROW_CHECK(object.data_size == data_size);
+  ARROW_CHECK(object.metadata_size == metadata_size);
+  // The metadata should come right after the data.
+  ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
+  *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+          object.data_offset;
+  // If plasma_create is being called from a transfer, then we will not copy 
the
+  // metadata here. The metadata will be written along with the data streamed
+  // from the transfer.
+  if (metadata != NULL) {
+    // Copy the metadata to the buffer.
+    memcpy(*data + object.data_size, metadata, metadata_size);
+  }
+  // Increment the count of the number of instances of this object that this
+  // client is using. A call to PlasmaClient::Release is required to decrement
+  // this
+  // count. Cache the reference to the object.
+  increment_object_count(object_id, &object, false);
+  // We increment the count a second time (and the corresponding decrement will
+  // happen in a PlasmaClient::Release call in plasma_seal) so even if the
+  // buffer
+  // returned by PlasmaClient::Dreate goes out of scope, the object does not 
get
+  // released before the call to PlasmaClient::Seal happens.
+  increment_object_count(object_id, &object, false);
+  return Status::OK();
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+    int64_t timeout_ms, ObjectBuffer* object_buffers) {
+  // Fill out the info for the objects that are already in use locally.
+  bool all_present = true;
+  for (int i = 0; i < num_objects; ++i) {
+    auto object_entry = objects_in_use_.find(object_ids[i]);
+    if (object_entry == objects_in_use_.end()) {
+      // This object is not currently in use by this client, so we need to send
+      // a request to the store.
+      all_present = false;
+      // Make a note to ourselves that the object is not present.
+      object_buffers[i].data_size = -1;
+    } else {
+      // NOTE: If the object is still unsealed, we will deadlock, since we must
+      // have been the one who created it.
+      ARROW_CHECK(object_entry->second->is_sealed)
+          << "Plasma client called get on an unsealed object that it created";
+      PlasmaObject* object = &object_entry->second->object;
+      object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
+      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data_size = object->data_size;
+      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+      object_buffers[i].metadata_size = object->metadata_size;
+      // Increment the count of the number of instances of this object that 
this
+      // client is using. A call to PlasmaClient::Release is required to
+      // decrement this
+      // count. Cache the reference to the object.
+      increment_object_count(object_ids[i], object, true);
+    }
+  }
+
+  if (all_present) { return Status::OK(); }
+
+  // If we get here, then the objects aren't all currently in use by this
+  // client, so we need to send a request to the plasma store.
+  RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, 
timeout_ms));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, 
&buffer));
+  std::vector<ObjectID> received_object_ids(num_objects);
+  std::vector<PlasmaObject> object_data(num_objects);
+  PlasmaObject* object;
+  RETURN_NOT_OK(ReadGetReply(
+      buffer.data(), received_object_ids.data(), object_data.data(), 
num_objects));
+
+  for (int i = 0; i < num_objects; ++i) {
+    DCHECK(received_object_ids[i] == object_ids[i]);
+    object = &object_data[i];
+    if (object_buffers[i].data_size != -1) {
+      // If the object was already in use by the client, then the store should
+      // have returned it.
+      DCHECK_NE(object->data_size, -1);
+      // We won't use this file descriptor, but the store sent us one, so we
+      // need to receive it and then close it right away so we don't leak file
+      // descriptors.
+      int fd = recv_fd(store_conn_);
+      close(fd);
+      ARROW_CHECK(fd >= 0);
+      // We've already filled out the information for this object, so we can
+      // just continue.
+      continue;
+    }
+    // If we are here, the object was not currently in use, so we need to
+    // process the reply from the object store.
+    if (object->data_size != -1) {
+      // The object was retrieved. The user will be responsible for releasing
+      // this object.
+      int fd = recv_fd(store_conn_);
+      ARROW_CHECK(fd >= 0);
+      object_buffers[i].data =
+          lookup_or_mmap(fd, object->handle.store_fd, 
object->handle.mmap_size);
+      // Finish filling out the return values.
+      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data_size = object->data_size;
+      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+      object_buffers[i].metadata_size = object->metadata_size;
+      // Increment the count of the number of instances of this object that 
this
+      // client is using. A call to PlasmaClient::Release is required to
+      // decrement this
+      // count. Cache the reference to the object.
+      increment_object_count(received_object_ids[i], object, true);
+    } else {
+      // The object was not retrieved. Make sure we already put a -1 here to
+      // indicate that the object was not retrieved. The caller is not
+      // responsible for releasing this object.
+      DCHECK_EQ(object_buffers[i].data_size, -1);
+      object_buffers[i].data_size = -1;
+    }
+  }
+  return Status::OK();
+}
+
+/// This is a helper method for implementing plasma_release. We maintain a
+/// buffer
+/// of release calls and only perform them once the buffer becomes full (as
+/// judged by the aggregate sizes of the objects). There may be multiple 
release
+/// calls for the same object ID in the buffer. In this case, the first release
+/// calls will not do anything. The client will only send a message to the 
store
+/// releasing the object when the client is truly done with the object.
+///
+/// @param conn The plasma connection.
+/// @param object_id The object ID to attempt to release.
+Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
+  // Decrement the count of the number of instances of this object that are
+  // being used by this client. The corresponding increment should have 
happened
+  // in PlasmaClient::Get.
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end());
+  object_entry->second->count -= 1;
+  ARROW_CHECK(object_entry->second->count >= 0);
+  // Check if the client is no longer using this object.
+  if (object_entry->second->count == 0) {
+    // Decrement the count of the number of objects in this memory-mapped file
+    // that the client is using. The corresponding increment should have
+    // happened in plasma_get.
+    int fd = object_entry->second->object.handle.store_fd;
+    auto entry = mmap_table_.find(fd);
+    ARROW_CHECK(entry != mmap_table_.end());
+    entry->second.count -= 1;
+    ARROW_CHECK(entry->second.count >= 0);
+    // If none are being used then unmap the file.
+    if (entry->second.count == 0) {
+      munmap(entry->second.pointer, entry->second.length);
+      // Remove the corresponding entry from the hash table.
+      mmap_table_.erase(fd);
+    }
+    // Tell the store that the client no longer needs the object.
+    RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
+    // Update the in_use_object_bytes_.
+    in_use_object_bytes_ -= (object_entry->second->object.data_size +
+                             object_entry->second->object.metadata_size);
+    DCHECK_GE(in_use_object_bytes_, 0);
+    // Remove the entry from the hash table of objects currently in use.
+    objects_in_use_.erase(object_id);
+  }
+  return Status::OK();
+}
+
+Status PlasmaClient::Release(const ObjectID& object_id) {
+  // Add the new object to the release history.
+  release_history_.push_front(object_id);
+  // If there are too many bytes in use by the client or if there are too many
+  // pending release calls, and there are at least some pending release calls 
in
+  // the release_history list, then release some objects.
+  while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 
100) ||
+             release_history_.size() > config_.release_delay) &&
+         release_history_.size() > 0) {
+    // Perform a release for the object ID for the first pending release.
+    RETURN_NOT_OK(PerformRelease(release_history_.back()));
+    // Remove the last entry from the release history.
+    release_history_.pop_back();
+  }
+  return Status::OK();
+}
+
+// This method is used to query whether the plasma store contains an object.
+Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
+  // Check if we already have a reference to the object.
+  if (objects_in_use_.count(object_id) > 0) {
+    *has_object = 1;
+  } else {
+    // If we don't already have a reference to the object, check with the store
+    // to see if we have the object.
+    RETURN_NOT_OK(SendContainsRequest(store_conn_, object_id));
+    std::vector<uint8_t> buffer;
+    RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, 
&buffer));
+    ObjectID object_id2;
+    RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
+  }
+  return Status::OK();
+}
+
+static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, 
uint64_t* hash) {
+  XXH64_state_t hash_state;
+  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+  XXH64_update(&hash_state, data, nbytes);
+  *hash = XXH64_digest(&hash_state);
+}
+
+static inline bool compute_object_hash_parallel(
+    XXH64_state_t* hash_state, const unsigned char* data, int64_t nbytes) {
+  // Note that this function will likely be faster if the address of data is
+  // aligned on a 64-byte boundary.
+  const int num_threads = kThreadPoolSize;
+  uint64_t threadhash[num_threads + 1];
+  const uint64_t data_address = reinterpret_cast<uint64_t>(data);
+  const uint64_t num_blocks = nbytes / BLOCK_SIZE;
+  const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
+  const uint64_t right_address = data_address + chunk_size * num_threads;
+  const uint64_t suffix = (data_address + nbytes) - right_address;
+  // Now the data layout is | k * num_threads * block_size | suffix | ==
+  // | num_threads * chunk_size | suffix |, where chunk_size = k * block_size.
+  // Each thread gets a "chunk" of k blocks, except the suffix thread.
+
+  for (int i = 0; i < num_threads; i++) {
+    threadpool_[i] = std::thread(ComputeBlockHash,
+        reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, chunk_size,
+        &threadhash[i]);
+  }
+  ComputeBlockHash(
+      reinterpret_cast<uint8_t*>(right_address), suffix, 
&threadhash[num_threads]);
+
+  // Join the threads.
+  for (auto& t : threadpool_) {
+    if (t.joinable()) { t.join(); }
+  }
+
+  XXH64_update(hash_state, (unsigned char*)threadhash, sizeof(threadhash));
+  return true;
+}
+
+static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+  XXH64_state_t hash_state;
+  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+  if (obj_buffer.data_size >= kBytesInMB) {
+    compute_object_hash_parallel(
+        &hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
+  } else {
+    XXH64_update(&hash_state, (unsigned char*)obj_buffer.data, 
obj_buffer.data_size);
+  }
+  XXH64_update(
+      &hash_state, (unsigned char*)obj_buffer.metadata, 
obj_buffer.metadata_size);
+  return XXH64_digest(&hash_state);
+}
+
+bool plasma_compute_object_hash(
+    PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
+  // Get the plasma object data. We pass in a timeout of 0 to indicate that
+  // the operation should timeout immediately.
+  ObjectBuffer object_buffer;
+  ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
+  // If the object was not retrieved, return false.
+  if (object_buffer.data_size == -1) { return false; }
+  // Compute the hash.
+  uint64_t hash = compute_object_hash(object_buffer);
+  memcpy(digest, &hash, sizeof(hash));
+  // Release the plasma object.
+  ARROW_CHECK_OK(conn->Release(object_id));
+  return true;
+}
+
+Status PlasmaClient::Seal(const ObjectID& object_id) {
+  // Make sure this client has a reference to the object before sending the
+  // request to Plasma.
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end())
+      << "Plasma client called seal an object without a reference to it";
+  ARROW_CHECK(!object_entry->second->is_sealed)
+      << "Plasma client called seal an already sealed object";
+  object_entry->second->is_sealed = true;
+  /// Send the seal request to Plasma.
+  static unsigned char digest[kDigestSize];
+  ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
+  RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
+  // We call PlasmaClient::Release to decrement the number of instances of this
+  // object
+  // that are currently being used by this client. The corresponding increment
+  // happened in plasma_create and was used to ensure that the object was not
+  // released before the call to PlasmaClient::Seal.
+  return Release(object_id);
+}
+
+Status PlasmaClient::Delete(const ObjectID& object_id) {
+  // TODO(rkn): In the future, we can use this method to give hints to the
+  // eviction policy about when an object will no longer be needed.
+  return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+}
+
+Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
+  // Send a request to the store to evict objects.
+  RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
+  // Wait for a response with the number of bytes actually evicted.
+  std::vector<uint8_t> buffer;
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+  return ReadEvictReply(buffer.data(), num_bytes_evicted);
+}
+
+Status PlasmaClient::Subscribe(int* fd) {
+  int sock[2];
+  // Create a non-blocking socket pair. This will only be used to send
+  // notifications from the Plasma store to the client.
+  socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
+  // Make the socket non-blocking.
+  int flags = fcntl(sock[1], F_GETFL, 0);
+  ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
+  // Tell the Plasma store about the subscription.
+  RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
+  // Send the file descriptor that the Plasma store should use to push
+  // notifications about sealed objects to this client.
+  ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0);
+  close(sock[1]);
+  // Return the file descriptor that the client should use to read 
notifications
+  // about sealed objects.
+  *fd = sock[0];
+  return Status::OK();
+}
+
+Status PlasmaClient::Connect(const std::string& store_socket_name,
+    const std::string& manager_socket_name, int release_delay) {
+  store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
+  if (manager_socket_name != "") {
+    manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
+  } else {
+    manager_conn_ = -1;
+  }
+  config_.release_delay = release_delay;
+  in_use_object_bytes_ = 0;
+  // Send a ConnectRequest to the store to get its memory capacity.
+  RETURN_NOT_OK(SendConnectRequest(store_conn_));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, 
&buffer));
+  RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
+  return Status::OK();
+}
+
+Status PlasmaClient::Disconnect() {
+  // NOTE: We purposefully do not finish sending release calls for objects in
+  // use, so that we don't duplicate PlasmaClient::Release calls (when handling
+  // a SIGTERM, for example).
+
+  // Close the connections to Plasma. The Plasma store will release the objects
+  // that were in use by us when handling the SIGPIPE.
+  close(store_conn_);
+  if (manager_conn_ >= 0) { close(manager_conn_); }
+  return Status::OK();
+}
+
+#define h_addr h_addr_list[0]
+
+Status PlasmaClient::Transfer(const char* address, int port, const ObjectID& 
object_id) {
+  return SendDataRequest(manager_conn_, object_id, address, port);
+}
+
+Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
+  ARROW_CHECK(manager_conn_ >= 0);
+  return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
+}
+
+int PlasmaClient::get_manager_fd() {
+  return manager_conn_;
+}
+
+Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
+  ARROW_CHECK(manager_conn_ >= 0);
+
+  RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, 
&buffer));
+  ObjectID id;
+  RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
+  ARROW_CHECK(object_id == id);
+  return Status::OK();
+}
+
+Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* 
object_requests,
+    int num_ready_objects, int64_t timeout_ms, int* num_objects_ready) {
+  ARROW_CHECK(manager_conn_ >= 0);
+  ARROW_CHECK(num_object_requests > 0);
+  ARROW_CHECK(num_ready_objects > 0);
+  ARROW_CHECK(num_ready_objects <= num_object_requests);
+
+  for (int i = 0; i < num_object_requests; ++i) {
+    ARROW_CHECK(object_requests[i].type == PLASMA_QUERY_LOCAL ||
+                object_requests[i].type == PLASMA_QUERY_ANYWHERE);
+  }
+
+  RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, 
num_object_requests,
+      num_ready_objects, timeout_ms));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, 
&buffer));
+  RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, 
&num_ready_objects));
+
+  *num_objects_ready = 0;
+  for (int i = 0; i < num_object_requests; ++i) {
+    int type = object_requests[i].type;
+    int status = object_requests[i].status;
+    switch (type) {
+      case PLASMA_QUERY_LOCAL:
+        if (status == ObjectStatus_Local) { *num_objects_ready += 1; }
+        break;
+      case PLASMA_QUERY_ANYWHERE:
+        if (status == ObjectStatus_Local || status == ObjectStatus_Remote) {
+          *num_objects_ready += 1;
+        } else {
+          ARROW_CHECK(status == ObjectStatus_Nonexistent);
+        }
+        break;
+      default:
+        ARROW_LOG(FATAL) << "This code should be unreachable.";
+    }
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
new file mode 100644
index 0000000..fb3a161
--- /dev/null
+++ b/cpp/src/plasma/client.h
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_CLIENT_H
+#define PLASMA_CLIENT_H
+
+#include <stdbool.h>
+#include <time.h>
+
+#include <deque>
+#include <string>
+
+#include "plasma/plasma.h"
+
+using arrow::Status;
+
+#define PLASMA_DEFAULT_RELEASE_DELAY 64
+
+// Use 100MB as an overestimate of the L3 cache size.
+constexpr int64_t kL3CacheSizeBytes = 100000000;
+
+/// Object buffer data structure.
+struct ObjectBuffer {
+  /// The size in bytes of the data object.
+  int64_t data_size;
+  /// The address of the data object.
+  uint8_t* data;
+  /// The metadata size in bytes.
+  int64_t metadata_size;
+  /// The address of the metadata.
+  uint8_t* metadata;
+};
+
+/// Configuration options for the plasma client.
+struct PlasmaClientConfig {
+  /// Number of release calls we wait until the object is actually released.
+  /// This allows us to avoid invalidating the cpu cache on workers if objects
+  /// are reused accross tasks.
+  size_t release_delay;
+};
+
+struct ClientMmapTableEntry {
+  /// The result of mmap for this file descriptor.
+  uint8_t* pointer;
+  /// The length of the memory-mapped file.
+  size_t length;
+  /// The number of objects in this memory-mapped file that are currently being
+  /// used by the client. When this count reaches zeros, we unmap the file.
+  int count;
+};
+
+struct ObjectInUseEntry {
+  /// A count of the number of times this client has called 
PlasmaClient::Create
+  /// or
+  /// PlasmaClient::Get on this object ID minus the number of calls to
+  /// PlasmaClient::Release.
+  /// When this count reaches zero, we remove the entry from the ObjectsInUse
+  /// and decrement a count in the relevant ClientMmapTableEntry.
+  int count;
+  /// Cached information to read the object.
+  PlasmaObject object;
+  /// A flag representing whether the object has been sealed.
+  bool is_sealed;
+};
+
+class PlasmaClient {
+ public:
+  /// Connect to the local plasma store and plasma manager. Return
+  /// the resulting connection.
+  ///
+  /// @param store_socket_name The name of the UNIX domain socket to use to
+  ///        connect to the Plasma store.
+  /// @param manager_socket_name The name of the UNIX domain socket to use to
+  ///        connect to the local Plasma manager. If this is "", then this
+  ///        function will not connect to a manager.
+  /// @param release_delay Number of released objects that are kept around
+  ///        and not evicted to avoid too many munmaps.
+  /// @return The return status.
+  Status Connect(const std::string& store_socket_name,
+      const std::string& manager_socket_name, int release_delay);
+
+  /// Create an object in the Plasma Store. Any metadata for this object must 
be
+  /// be passed in when the object is created.
+  ///
+  /// @param object_id The ID to use for the newly created object.
+  /// @param data_size The size in bytes of the space to be allocated for this
+  /// object's
+  ///        data (this does not include space used for metadata).
+  /// @param metadata The object's metadata. If there is no metadata, this
+  /// pointer
+  ///        should be NULL.
+  /// @param metadata_size The size in bytes of the metadata. If there is no
+  ///        metadata, this should be 0.
+  /// @param data The address of the newly created object will be written here.
+  /// @return The return status.
+  Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* 
metadata,
+      int64_t metadata_size, uint8_t** data);
+
+  /// Get some objects from the Plasma Store. This function will block until 
the
+  /// objects have all been created and sealed in the Plasma Store or the
+  /// timeout
+  /// expires. The caller is responsible for releasing any retrieved objects,
+  /// but
+  /// the caller should not release objects that were not retrieved.
+  ///
+  /// @param object_ids The IDs of the objects to get.
+  /// @param num_object_ids The number of object IDs to get.
+  /// @param timeout_ms The amount of time in milliseconds to wait before this
+  ///        request times out. If this value is -1, then no timeout is set.
+  /// @param object_buffers An array where the results will be stored. If the
+  /// data
+  ///        size field is -1, then the object was not retrieved.
+  /// @return The return status.
+  Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t 
timeout_ms,
+      ObjectBuffer* object_buffers);
+
+  /// Tell Plasma that the client no longer needs the object. This should be
+  /// called
+  /// after Get when the client is done with the object. After this call,
+  /// the address returned by Get is no longer valid. This should be called
+  /// once for each call to Get (with the same object ID).
+  ///
+  /// @param object_id The ID of the object that is no longer needed.
+  /// @return The return status.
+  Status Release(const ObjectID& object_id);
+
+  /// Check if the object store contains a particular object and the object has
+  /// been sealed. The result will be stored in has_object.
+  ///
+  /// @todo: We may want to indicate if the object has been created but not
+  /// sealed.
+  ///
+  /// @param object_id The ID of the object whose presence we are checking.
+  /// @param has_object The function will write true at this address if
+  ///        the object is present and false if it is not present.
+  /// @return The return status.
+  Status Contains(const ObjectID& object_id, bool* has_object);
+
+  /// Seal an object in the object store. The object will be immutable after
+  /// this
+  /// call.
+  ///
+  /// @param object_id The ID of the object to seal.
+  /// @return The return status.
+  Status Seal(const ObjectID& object_id);
+
+  /// Delete an object from the object store. This currently assumes that the
+  /// object is present and has been sealed.
+  ///
+  /// @todo We may want to allow the deletion of objects that are not present 
or
+  ///       haven't been sealed.
+  ///
+  /// @param object_id The ID of the object to delete.
+  /// @return The return status.
+  Status Delete(const ObjectID& object_id);
+
+  /// Delete objects until we have freed up num_bytes bytes or there are no 
more
+  /// released objects that can be deleted.
+  ///
+  /// @param num_bytes The number of bytes to try to free up.
+  /// @param num_bytes_evicted Out parameter for total number of bytes of space
+  /// retrieved.
+  /// @return The return status.
+  Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
+
+  /// Subscribe to notifications when objects are sealed in the object store.
+  /// Whenever an object is sealed, a message will be written to the client
+  /// socket
+  /// that is returned by this method.
+  ///
+  /// @param fd Out parameter for the file descriptor the client should use to
+  /// read notifications
+  ///         from the object store about sealed objects.
+  /// @return The return status.
+  Status Subscribe(int* fd);
+
+  /// Disconnect from the local plasma instance, including the local store and
+  /// manager.
+  ///
+  /// @return The return status.
+  Status Disconnect();
+
+  /// Attempt to initiate the transfer of some objects from remote Plasma
+  /// Stores.
+  /// This method does not guarantee that the fetched objects will arrive
+  /// locally.
+  ///
+  /// For an object that is available in the local Plasma Store, this method
+  /// will
+  /// not do anything. For an object that is not available locally, it will
+  /// check
+  /// if the object are already being fetched. If so, it will not do anything.
+  /// If
+  /// not, it will query the object table for a list of Plasma Managers that
+  /// have
+  /// the object. The object table will return a non-empty list, and this 
Plasma
+  /// Manager will attempt to initiate transfers from one of those Plasma
+  /// Managers.
+  ///
+  /// This function is non-blocking.
+  ///
+  /// This method is idempotent in the sense that it is ok to call it multiple
+  /// times.
+  ///
+  /// @param num_object_ids The number of object IDs fetch is being called on.
+  /// @param object_ids The IDs of the objects that fetch is being called on.
+  /// @return The return status.
+  Status Fetch(int num_object_ids, const ObjectID* object_ids);
+
+  /// Wait for (1) a specified number of objects to be available (sealed) in 
the
+  /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
+  /// expire. This is a blocking call.
+  ///
+  /// @param num_object_requests Size of the object_requests array.
+  /// @param object_requests Object event array. Each element contains a 
request
+  ///        for a particular object_id. The type of request is specified in 
the
+  ///        "type" field.
+  ///        - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
+  ///          available in the local Plasma Store. In this case, this function
+  ///          sets the "status" field to ObjectStatus_Local. Note, if the
+  ///          status
+  ///          is not ObjectStatus_Local, it will be ObjectStatus_Nonexistent,
+  ///          but it may exist elsewhere in the system.
+  ///        - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
+  ///        becomes
+  ///          available either at the local Plasma Store or on a remote Plasma
+  ///          Store. In this case, the functions sets the "status" field to
+  ///          ObjectStatus_Local or ObjectStatus_Remote.
+  /// @param num_ready_objects The number of requests in object_requests array
+  /// that
+  ///        must be satisfied before the function returns, unless it timeouts.
+  ///        The num_ready_objects should be no larger than 
num_object_requests.
+  /// @param timeout_ms Timeout value in milliseconds. If this timeout expires
+  ///        before min_num_ready_objects of requests are satisfied, the
+  ///        function
+  ///        returns.
+  /// @param num_objects_ready Out parameter for number of satisfied requests 
in
+  ///        the object_requests list. If the returned number is less than
+  ///        min_num_ready_objects this means that timeout expired.
+  /// @return The return status.
+  Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
+      int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+
+  /// Transfer local object to a different plasma manager.
+  ///
+  /// @param conn The object containing the connection state.
+  /// @param addr IP address of the plasma manager we are transfering to.
+  /// @param port Port of the plasma manager we are transfering to.
+  /// @object_id ObjectID of the object we are transfering.
+  /// @return The return status.
+  Status Transfer(const char* addr, int port, const ObjectID& object_id);
+
+  /// Return the status of a given object. This method may query the object
+  /// table.
+  ///
+  /// @param conn The object containing the connection state.
+  /// @param object_id The ID of the object whose status we query.
+  /// @param object_status Out parameter for object status. Can take the
+  ///         following values.
+  ///         - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
+  ///         Store.
+  ///           has been already scheduled by the Plasma Manager.
+  ///         - PLASMA_CLIENT_TRANSFER, if the object is either currently being
+  ///           transferred or just scheduled.
+  ///         - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
+  ///           Plasma Store.
+  ///         - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in 
the
+  ///           system.
+  /// @return The return status.
+  Status Info(const ObjectID& object_id, int* object_status);
+
+  /// Get the file descriptor for the socket connection to the plasma manager.
+  ///
+  /// @param conn The plasma connection.
+  /// @return The file descriptor for the manager connection. If there is no
+  ///         connection to the manager, this is -1.
+  int get_manager_fd();
+
+ private:
+  Status PerformRelease(const ObjectID& object_id);
+
+  uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
+
+  uint8_t* lookup_mmapped_file(int store_fd_val);
+
+  void increment_object_count(
+      const ObjectID& object_id, PlasmaObject* object, bool is_sealed);
+
+  /// File descriptor of the Unix domain socket that connects to the store.
+  int store_conn_;
+  /// File descriptor of the Unix domain socket that connects to the manager.
+  int manager_conn_;
+  /// Table of dlmalloc buffer files that have been memory mapped so far. This
+  /// is a hash table mapping a file descriptor to a struct containing the
+  /// address of the corresponding memory-mapped file.
+  std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
+  /// A hash table of the object IDs that are currently being used by this
+  /// client.
+  std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>, 
UniqueIDHasher>
+      objects_in_use_;
+  /// Object IDs of the last few release calls. This is a deque and
+  /// is used to delay releasing objects to see if they can be reused by
+  /// subsequent tasks so we do not unneccessarily invalidate cpu caches.
+  /// TODO(pcm): replace this with a proper lru cache using the size of the L3
+  /// cache.
+  std::deque<ObjectID> release_history_;
+  /// The number of bytes in the combined objects that are held in the release
+  /// history doubly-linked list. If this is too large then the client starts
+  /// releasing objects.
+  int64_t in_use_object_bytes_;
+  /// Configuration options for the plasma client.
+  PlasmaClientConfig config_;
+  /// The amount of memory available to the Plasma store. The client needs this
+  /// information to make sure that it does not delay in releasing so much
+  /// memory that the store is unable to evict enough objects to free up space.
+  int64_t store_capacity_;
+};
+
+/// Compute the hash of an object in the object store.
+///
+/// @param conn The object containing the connection state.
+/// @param object_id The ID of the object we want to hash.
+/// @param digest A pointer at which to return the hash digest of the object.
+///        The pointer must have at least DIGEST_SIZE bytes allocated.
+/// @return A boolean representing whether the hash operation succeeded.
+bool plasma_compute_object_hash(
+    PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
+
+#endif  // PLASMA_CLIENT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
new file mode 100644
index 0000000..a09a963
--- /dev/null
+++ b/cpp/src/plasma/common.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/common.h"
+
+#include <random>
+
+#include "format/plasma_generated.h"
+
+using arrow::Status;
+
+UniqueID UniqueID::from_random() {
+  UniqueID id;
+  uint8_t* data = id.mutable_data();
+  std::random_device engine;
+  for (int i = 0; i < kUniqueIDSize; i++) {
+    data[i] = static_cast<uint8_t>(engine());
+  }
+  return id;
+}
+
+UniqueID UniqueID::from_binary(const std::string& binary) {
+  UniqueID id;
+  std::memcpy(&id, binary.data(), sizeof(id));
+  return id;
+}
+
+const uint8_t* UniqueID::data() const {
+  return id_;
+}
+
+uint8_t* UniqueID::mutable_data() {
+  return id_;
+}
+
+std::string UniqueID::binary() const {
+  return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize);
+}
+
+std::string UniqueID::hex() const {
+  constexpr char hex[] = "0123456789abcdef";
+  std::string result;
+  for (int i = 0; i < kUniqueIDSize; i++) {
+    unsigned int val = id_[i];
+    result.push_back(hex[val >> 4]);
+    result.push_back(hex[val & 0xf]);
+  }
+  return result;
+}
+
+bool UniqueID::operator==(const UniqueID& rhs) const {
+  return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
+}
+
+Status plasma_error_status(int plasma_error) {
+  switch (plasma_error) {
+    case PlasmaError_OK:
+      return Status::OK();
+    case PlasmaError_ObjectExists:
+      return Status::PlasmaObjectExists("object already exists in the plasma 
store");
+    case PlasmaError_ObjectNonexistent:
+      return Status::PlasmaObjectNonexistent("object does not exist in the 
plasma store");
+    case PlasmaError_OutOfMemory:
+      return Status::PlasmaStoreFull("object does not fit in the plasma 
store");
+    default:
+      ARROW_LOG(FATAL) << "unknown plasma error code " << plasma_error;
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
new file mode 100644
index 0000000..85dc74b
--- /dev/null
+++ b/cpp/src/plasma/common.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_COMMON_H
+#define PLASMA_COMMON_H
+
+#include <cstring>
+#include <string>
+// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
+// and get rid of the next three lines:
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+constexpr int64_t kUniqueIDSize = 20;
+
+class UniqueID {
+ public:
+  static UniqueID from_random();
+  static UniqueID from_binary(const std::string& binary);
+  bool operator==(const UniqueID& rhs) const;
+  const uint8_t* data() const;
+  uint8_t* mutable_data();
+  std::string binary() const;
+  std::string hex() const;
+
+ private:
+  uint8_t id_[kUniqueIDSize];
+};
+
+static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
+
+struct UniqueIDHasher {
+  // ObjectID hashing function.
+  size_t operator()(const UniqueID& id) const {
+    size_t result;
+    std::memcpy(&result, id.data(), sizeof(size_t));
+    return result;
+  }
+};
+
+typedef UniqueID ObjectID;
+
+arrow::Status plasma_error_status(int plasma_error);
+
+#endif  // PLASMA_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
new file mode 100644
index 0000000..a9f7356
--- /dev/null
+++ b/cpp/src/plasma/events.cc
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/events.h"
+
+#include <errno.h>
+
+void EventLoop::file_event_callback(
+    aeEventLoop* loop, int fd, void* context, int events) {
+  FileCallback* callback = reinterpret_cast<FileCallback*>(context);
+  (*callback)(events);
+}
+
+int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* 
context) {
+  TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
+  return (*callback)(timer_id);
+}
+
+constexpr int kInitialEventLoopSize = 1024;
+
+EventLoop::EventLoop() {
+  loop_ = aeCreateEventLoop(kInitialEventLoopSize);
+}
+
+bool EventLoop::add_file_event(int fd, int events, const FileCallback& 
callback) {
+  if (file_callbacks_.find(fd) != file_callbacks_.end()) { return false; }
+  auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
+  void* context = reinterpret_cast<void*>(data.get());
+  // Try to add the file descriptor.
+  int err = aeCreateFileEvent(loop_, fd, events, 
EventLoop::file_event_callback, context);
+  // If it cannot be added, increase the size of the event loop.
+  if (err == AE_ERR && errno == ERANGE) {
+    err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
+    if (err != AE_OK) { return false; }
+    err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, 
context);
+  }
+  // In any case, test if there were errors.
+  if (err == AE_OK) {
+    file_callbacks_.emplace(fd, std::move(data));
+    return true;
+  }
+  return false;
+}
+
+void EventLoop::remove_file_event(int fd) {
+  aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
+  file_callbacks_.erase(fd);
+}
+
+void EventLoop::run() {
+  aeMain(loop_);
+}
+
+int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
+  auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
+  void* context = reinterpret_cast<void*>(data.get());
+  int64_t timer_id =
+      aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, 
context, NULL);
+  timer_callbacks_.emplace(timer_id, std::move(data));
+  return timer_id;
+}
+
+int EventLoop::remove_timer(int64_t timer_id) {
+  int err = aeDeleteTimeEvent(loop_, timer_id);
+  timer_callbacks_.erase(timer_id);
+  return err;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
new file mode 100644
index 0000000..bd93d6b
--- /dev/null
+++ b/cpp/src/plasma/events.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVENTS
+#define PLASMA_EVENTS
+
+#include <functional>
+#include <memory>
+#include <unordered_map>
+
+extern "C" {
+#include "ae/ae.h"
+}
+
+/// Constant specifying that the timer is done and it will be removed.
+constexpr int kEventLoopTimerDone = AE_NOMORE;
+
+/// Read event on the file descriptor.
+constexpr int kEventLoopRead = AE_READABLE;
+
+/// Write event on the file descriptor.
+constexpr int kEventLoopWrite = AE_WRITABLE;
+
+typedef long long TimerID;  // NOLINT
+
+class EventLoop {
+ public:
+  // Signature of the handler that will be called when there is a new event
+  // on the file descriptor that this handler has been registered for.
+  //
+  // The arguments are the event flags (read or write).
+  using FileCallback = std::function<void(int)>;
+
+  // This handler will be called when a timer times out. The timer id is
+  // passed as an argument. The return is the number of milliseconds the timer
+  // shall be reset to or kEventLoopTimerDone if the timer shall not be
+  // triggered again.
+  using TimerCallback = std::function<int(int64_t)>;
+
+  EventLoop();
+
+  /// Add a new file event handler to the event loop.
+  ///
+  /// @param fd The file descriptor we are listening to.
+  /// @param events The flags for events we are listening to (read or write).
+  /// @param callback The callback that will be called when the event happens.
+  /// @return Returns true if the event handler was added successfully.
+  bool add_file_event(int fd, int events, const FileCallback& callback);
+
+  /// Remove a file event handler from the event loop.
+  ///
+  /// @param fd The file descriptor of the event handler.
+  /// @return Void.
+  void remove_file_event(int fd);
+
+  /// Register a handler that will be called after a time slice of
+  ///  "timeout" milliseconds.
+  ///
+  ///  @param timeout The timeout in milliseconds.
+  ///  @param callback The callback for the timeout.
+  ///  @return The ID of the newly created timer.
+  int64_t add_timer(int64_t timeout, const TimerCallback& callback);
+
+  /// Remove a timer handler from the event loop.
+  ///
+  /// @param timer_id The ID of the timer that is to be removed.
+  /// @return The ae.c error code. TODO(pcm): needs to be standardized
+  int remove_timer(int64_t timer_id);
+
+  /// Run the event loop.
+  ///
+  /// @return Void.
+  void run();
+
+ private:
+  static void file_event_callback(aeEventLoop* loop, int fd, void* context, 
int events);
+
+  static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* 
context);
+
+  aeEventLoop* loop_;
+  std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
+  std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
+};
+
+#endif  // PLASMA_EVENTS

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc 
b/cpp/src/plasma/eviction_policy.cc
new file mode 100644
index 0000000..4ae6384
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/eviction_policy.h"
+
+#include <algorithm>
+
+void LRUCache::add(const ObjectID& key, int64_t size) {
+  auto it = item_map_.find(key);
+  ARROW_CHECK(it == item_map_.end());
+  /* Note that it is important to use a list so the iterators stay valid. */
+  item_list_.emplace_front(key, size);
+  item_map_.emplace(key, item_list_.begin());
+}
+
+void LRUCache::remove(const ObjectID& key) {
+  auto it = item_map_.find(key);
+  ARROW_CHECK(it != item_map_.end());
+  item_list_.erase(it->second);
+  item_map_.erase(it);
+}
+
+int64_t LRUCache::choose_objects_to_evict(
+    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+  int64_t bytes_evicted = 0;
+  auto it = item_list_.end();
+  while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
+    it--;
+    objects_to_evict->push_back(it->first);
+    bytes_evicted += it->second;
+  }
+  return bytes_evicted;
+}
+
+EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info)
+    : memory_used_(0), store_info_(store_info) {}
+
+int64_t EvictionPolicy::choose_objects_to_evict(
+    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+  int64_t bytes_evicted =
+      cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
+  /* Update the LRU cache. */
+  for (auto& object_id : *objects_to_evict) {
+    cache_.remove(object_id);
+  }
+  /* Update the number of bytes used. */
+  memory_used_ -= bytes_evicted;
+  return bytes_evicted;
+}
+
+void EvictionPolicy::object_created(const ObjectID& object_id) {
+  auto entry = store_info_->objects[object_id].get();
+  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}
+
+bool EvictionPolicy::require_space(
+    int64_t size, std::vector<ObjectID>* objects_to_evict) {
+  /* Check if there is enough space to create the object. */
+  int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
+  int64_t num_bytes_evicted;
+  if (required_space > 0) {
+    /* Try to free up at least as much space as we need right now but ideally
+     * up to 20% of the total capacity. */
+    int64_t space_to_free = std::max(size, store_info_->memory_capacity / 5);
+    ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting 
objects";
+    /* Choose some objects to evict, and update the return pointers. */
+    num_bytes_evicted = choose_objects_to_evict(space_to_free, 
objects_to_evict);
+    ARROW_LOG(INFO) << "There is not enough space to create this object, so 
evicting "
+                    << objects_to_evict->size() << " objects to free up "
+                    << num_bytes_evicted << " bytes.";
+  } else {
+    num_bytes_evicted = 0;
+  }
+  if (num_bytes_evicted >= required_space) {
+    /* We only increment the space used if there is enough space to create the
+     * object. */
+    memory_used_ += size;
+  }
+  return num_bytes_evicted >= required_space;
+}
+
+void EvictionPolicy::begin_object_access(
+    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+  /* If the object is in the LRU cache, remove it. */
+  cache_.remove(object_id);
+}
+
+void EvictionPolicy::end_object_access(
+    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+  auto entry = store_info_->objects[object_id].get();
+  /* Add the object to the LRU cache.*/
+  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
new file mode 100644
index 0000000..3815fc6
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.h
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVICTION_POLICY_H
+#define PLASMA_EVICTION_POLICY_H
+
+#include <list>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/plasma.h"
+
+// ==== The eviction policy ====
+//
+// This file contains declaration for all functions and data structures that
+// need to be provided if you want to implement a new eviction algorithm for 
the
+// Plasma store.
+
+class LRUCache {
+ public:
+  LRUCache() {}
+
+  void add(const ObjectID& key, int64_t size);
+
+  void remove(const ObjectID& key);
+
+  int64_t choose_objects_to_evict(
+      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+  /// A doubly-linked list containing the items in the cache and
+  /// their sizes in LRU order.
+  typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
+  ItemList item_list_;
+  /// A hash table mapping the object ID of an object in the cache to its
+  /// location in the doubly linked list item_list_.
+  std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
+};
+
+/// The eviction policy.
+class EvictionPolicy {
+ public:
+  /// Construct an eviction policy.
+  ///
+  /// @param store_info Information about the Plasma store that is exposed
+  ///        to the eviction policy.
+  explicit EvictionPolicy(PlasmaStoreInfo* store_info);
+
+  /// This method will be called whenever an object is first created in order 
to
+  /// add it to the LRU cache. This is done so that the first time, the Plasma
+  /// store calls begin_object_access, we can remove the object from the LRU
+  /// cache.
+  ///
+  /// @param object_id The object ID of the object that was created.
+  /// @return Void.
+  void object_created(const ObjectID& object_id);
+
+  /// This method will be called when the Plasma store needs more space, 
perhaps
+  /// to create a new object. If the required amount of space cannot be freed 
up,
+  /// then a fatal error will be thrown. When this method is called, the 
eviction
+  /// policy will assume that the objects chosen to be evicted will in fact be
+  /// evicted from the Plasma store by the caller.
+  ///
+  /// @param size The size in bytes of the new object, including both data and
+  ///        metadata.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return True if enough space can be freed and false otherwise.
+  bool require_space(int64_t size, std::vector<ObjectID>* objects_to_evict);
+
+  /// This method will be called whenever an unused object in the Plasma store
+  /// starts to be used. When this method is called, the eviction policy will
+  /// assume that the objects chosen to be evicted will in fact be evicted from
+  /// the Plasma store by the caller.
+  ///
+  /// @param object_id The ID of the object that is now being used.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return Void.
+  void begin_object_access(
+      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+  /// This method will be called whenever an object in the Plasma store that 
was
+  /// being used is no longer being used. When this method is called, the
+  /// eviction policy will assume that the objects chosen to be evicted will in
+  /// fact be evicted from the Plasma store by the caller.
+  ///
+  /// @param object_id The ID of the object that is no longer being used.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return Void.
+  void end_object_access(
+      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+  /// Choose some objects to evict from the Plasma store. When this method is
+  /// called, the eviction policy will assume that the objects chosen to be
+  /// evicted will in fact be evicted from the Plasma store by the caller.
+  ///
+  /// @note This method is not part of the API. It is exposed in the header 
file
+  /// only for testing.
+  ///
+  /// @param num_bytes_required The number of bytes of space to try to free up.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return The total number of bytes of space chosen to be evicted.
+  int64_t choose_objects_to_evict(
+      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+  /// The amount of memory (in bytes) currently being used.
+  int64_t memory_used_;
+  /// Pointer to the plasma store info.
+  PlasmaStoreInfo* store_info_;
+  /// Datastructure for the LRU cache.
+  LRUCache cache_;
+};
+
+#endif  // PLASMA_EVICTION_POLICY_H

Reply via email to