diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..5d61e33
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "plasma/extension.h"
+#include <algorithm>
+#include <vector>
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/protocol.h"
+PyObject* PlasmaOutOfMemoryError;
+PyObject* PlasmaObjectExistsError;
+PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
+  const char* store_socket_name;
+  const char* manager_socket_name;
+  int release_delay;
+  if (!PyArg_ParseTuple(
+          args, "ssi", &store_socket_name, &manager_socket_name, 
&release_delay)) {
+    return NULL;
+  }
+  PlasmaClient* client = new PlasmaClient();
+  ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, 
+  return PyCapsule_New(client, "plasma", NULL);
+PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
+  PyObject* client_capsule;
+  if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
+  PlasmaClient* client;
+  ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
+  ARROW_CHECK_OK(client->Disconnect());
+  /* We use the context of the connection capsule to indicate if the connection
+   * is still active (if the context is NULL) or if it is closed (if the 
+   * is (void*) 0x1). This is neccessary because the primary pointer of the
+   * capsule cannot be NULL. */
+  PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
+PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  Py_ssize_t size;
+  PyObject* metadata;
+  if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
+          PyStringToUniqueID, &object_id, &size, &metadata)) {
+    return NULL;
+  }
+  if (!PyByteArray_Check(metadata)) {
+    PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
+    return NULL;
+  }
+  uint8_t* data;
+  Status s = client->Create(object_id, size,
+      reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
+      PyByteArray_Size(metadata), &data);
+  if (s.IsPlasmaObjectExists()) {
+    PyErr_SetString(PlasmaObjectExistsError,
+        "An object with this ID already exists in the plasma "
+        "store.");
+    return NULL;
+  }
+  if (s.IsPlasmaStoreFull()) {
+    PyErr_SetString(PlasmaOutOfMemoryError,
+        "The plasma store ran out of memory and could not create "
+        "this object.");
+    return NULL;
+  }
+  ARROW_CHECK(s.ok());
+  return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, 
+  return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
+PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, 
+          &object_id)) {
+    return NULL;
+  }
+  unsigned char digest[kDigestSize];
+  bool success = plasma_compute_object_hash(client, object_id, digest);
+  if (success) {
+    PyObject* digest_string =
+        PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), 
+    return digest_string;
+  } else {
+  }
+PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, 
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Seal(object_id));
+PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, 
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Release(object_id));
+PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  Py_ssize_t timeout_ms;
+  if (!PyArg_ParseTuple(
+          args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, 
&timeout_ms)) {
+    return NULL;
+  }
+  Py_ssize_t num_object_ids = PyList_Size(object_id_list);
+  std::vector<ObjectID> object_ids(num_object_ids);
+  std::vector<ObjectBuffer> object_buffers(num_object_ids);
+  for (int i = 0; i < num_object_ids; ++i) {
+    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+  }
+      client->Get(, num_object_ids, timeout_ms,;
+  PyObject* returns = PyList_New(num_object_ids);
+  for (int i = 0; i < num_object_ids; ++i) {
+    if (object_buffers[i].data_size != -1) {
+      /* The object was retrieved, so return the object. */
+      PyObject* t = PyTuple_New(2);
+      Py_ssize_t data_size = 
+      Py_ssize_t metadata_size = 
+      char* data = reinterpret_cast<char*>(object_buffers[i].data);
+      char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
+      PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, 
+      PyTuple_SET_ITEM(
+          t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
+      void* data = reinterpret_cast<void*>(object_buffers[i].data);
+      void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
+      PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
+      PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
+      ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
+    } else {
+      /* The object was not retrieved, so just add None to the list of return
+       * values. */
+      Py_INCREF(Py_None);
+      ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
+    }
+  }
+  return returns;
+PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, 
+          &object_id)) {
+    return NULL;
+  }
+  bool has_object;
+  ARROW_CHECK_OK(client->Contains(object_id, &has_object));
+  if (has_object) {
+  } else {
+  }
+PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, 
&object_id_list)) {
+    return NULL;
+  }
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+  Py_ssize_t n = PyList_Size(object_id_list);
+  ObjectID* object_ids = new ObjectID[n];
+  for (int i = 0; i < n; ++i) {
+    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+  }
+  ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
+  delete[] object_ids;
+PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  Py_ssize_t timeout;
+  int num_returns;
+  if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, 
+          &timeout, &num_returns)) {
+    return NULL;
+  }
+  Py_ssize_t n = PyList_Size(object_id_list);
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+  if (num_returns < 0) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "The argument num_returns cannot be less than 
+    return NULL;
+  }
+  if (num_returns > n) {
+    PyErr_SetString(PyExc_RuntimeError,
+        "The argument num_returns cannot be greater than len(object_ids)");
+    return NULL;
+  }
+  int64_t threshold = 1 << 30;
+  if (timeout > threshold) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 
+    return NULL;
+  }
+  std::vector<ObjectRequest> object_requests(n);
+  for (int i = 0; i < n; ++i) {
+    ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
+                    &object_requests[i].object_id) == 1);
+    object_requests[i].type = PLASMA_QUERY_ANYWHERE;
+  }
+  /* Drop the global interpreter lock while we are waiting, so other threads 
+   * run. */
+  int num_return_objects;
+      client->Wait(n,, num_returns, timeout, 
+  int num_to_return = std::min(num_return_objects, num_returns);
+  PyObject* ready_ids = PyList_New(num_to_return);
+  PyObject* waiting_ids = PySet_New(object_id_list);
+  int num_returned = 0;
+  for (int i = 0; i < n; ++i) {
+    if (num_returned == num_to_return) { break; }
+    if (object_requests[i].status == ObjectStatus_Local ||
+        object_requests[i].status == ObjectStatus_Remote) {
+      PyObject* ready = PyBytes_FromStringAndSize(
+          reinterpret_cast<char*>(&object_requests[i].object_id),
+          sizeof(object_requests[i].object_id));
+      PyList_SetItem(ready_ids, num_returned, ready);
+      PySet_Discard(waiting_ids, ready);
+      num_returned += 1;
+    } else {
+      ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
+    }
+  }
+  ARROW_CHECK(num_returned == num_to_return);
+  /* Return both the ready IDs and the remaining IDs. */
+  PyObject* t = PyTuple_New(2);
+  PyTuple_SetItem(t, 0, ready_ids);
+  PyTuple_SetItem(t, 1, waiting_ids);
+  return t;
+PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  Py_ssize_t num_bytes;
+  if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, 
&num_bytes)) {
+    return NULL;
+  }
+  int64_t evicted_bytes;
+  ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), 
+  return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
+PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, 
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Delete(object_id));
+PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  const char* addr;
+  int port;
+  if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
+          PyStringToUniqueID, &object_id, &addr, &port)) {
+    return NULL;
+  }
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
+PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return 
+  int sock;
+  ARROW_CHECK_OK(client->Subscribe(&sock));
+  return PyLong_FromLong(sock);
+PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
+  int plasma_sock;
+  if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
+  /* Receive object notification from the plasma connection socket. If the
+   * object was added, return a tuple of its fields: ObjectID, data_size,
+   * metadata_size. If the object was deleted, data_size and metadata_size will
+   * be set to -1. */
+  uint8_t* notification = read_message_async(plasma_sock);
+  if (notification == NULL) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "Failed to read object notification from Plasma 
+    return NULL;
+  }
+  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+  /* Construct a tuple from object_info and return. */
+  PyObject* t = PyTuple_New(3);
+  PyTuple_SetItem(t, 0, 
+                            object_info->object_id()->size()));
+  if (object_info->is_deletion()) {
+    PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
+    PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
+  } else {
+    PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
+    PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
+  }
+  delete[] notification;
+  return t;
+static PyMethodDef plasma_methods[] = {
+    {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
+    {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from 
+    {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
+    {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma 
+    {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
+    {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
+    {"contains", PyPlasma_contains, METH_VARARGS,
+        "Does the plasma store contain this plasma object?"},
+    {"fetch", PyPlasma_fetch, METH_VARARGS,
+        "Fetch the object from another plasma manager instance."},
+    {"wait", PyPlasma_wait, METH_VARARGS,
+        "Wait until num_returns objects in object_ids are ready."},
+    {"evict", PyPlasma_evict, METH_VARARGS,
+        "Evict some objects until we recover some number of bytes."},
+    {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
+    {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
+    {"transfer", PyPlasma_transfer, METH_VARARGS,
+        "Transfer object to another plasma manager."},
+    {"subscribe", PyPlasma_subscribe, METH_VARARGS,
+        "Subscribe to the plasma notification socket."},
+    {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
+        "Receive next notification from plasma notification socket."},
+    {NULL} /* Sentinel */
+static struct PyModuleDef moduledef = {
+    PyModuleDef_HEAD_INIT, "libplasma",    /* m_name */
+    "A Python client library for plasma.", /* m_doc */
+    0,                                     /* m_size */
+    plasma_methods,                        /* m_methods */
+    NULL,                                  /* m_reload */
+    NULL,                                  /* m_traverse */
+    NULL,                                  /* m_clear */
+    NULL,                                  /* m_free */
+#define INITERROR return NULL
+#define INITERROR return
+#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
+#define PyMODINIT_FUNC void
+#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
+#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
+MOD_INIT(libplasma) {
+  PyObject* m = PyModule_Create(&moduledef);
+  PyObject* m =
+      Py_InitModule3("libplasma", plasma_methods, "A Python client library for 
+  /* Create a custom exception for when an object ID is reused. */
+  char plasma_object_exists_error[] = "plasma_object_exists.error";
+  PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, 
+  Py_INCREF(PlasmaObjectExistsError);
+  PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
+  /* Create a custom exception for when the plasma store is out of memory. */
+  char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
+  PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, 
+  Py_INCREF(PlasmaOutOfMemoryError);
+  PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
+  return m;
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
new file mode 100644
index 0000000..cee30ab
--- /dev/null
+++ b/cpp/src/plasma/extension.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <Python.h>
+#include "bytesobject.h"  // NOLINT
+#include "plasma/client.h"
+#include "plasma/common.h"
+static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
+  if (PyCapsule_IsValid(object, "plasma")) {
+    *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, 
+    return 1;
+  } else {
+    PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
+    return 0;
+  }
+int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
+  if (PyBytes_Check(object)) {
+    memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
+    return 1;
+  } else {
+    PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
+    return 0;
+  }
diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..79da4f4
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,90 @@
+// Copyright 2013 Sharvil Nanavati
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "plasma/fling.h"
+#include <string.h>
+void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t 
buf_len) {
+  iov->iov_base = buf;
+  iov->iov_len = 1;
+  msg->msg_iov = iov;
+  msg->msg_iovlen = 1;
+  msg->msg_control = buf;
+  msg->msg_controllen = buf_len;
+  msg->msg_name = NULL;
+  msg->msg_namelen = 0;
+int send_fd(int conn, int fd) {
+  struct msghdr msg;
+  struct iovec iov;
+  char buf[CMSG_SPACE(sizeof(int))];
+  memset(&buf, 0, CMSG_SPACE(sizeof(int)));
+  init_msg(&msg, &iov, buf, sizeof(buf));
+  struct cmsghdr* header = CMSG_FIRSTHDR(&msg);
+  header->cmsg_level = SOL_SOCKET;
+  header->cmsg_type = SCM_RIGHTS;
+  header->cmsg_len = CMSG_LEN(sizeof(int));
+  *reinterpret_cast<int*>(CMSG_DATA(header)) = fd;
+  // Send file descriptor.
+  ssize_t r = sendmsg(conn, &msg, 0);
+  if (r >= 0) {
+    return 0;
+  } else {
+    return static_cast<int>(r);
+  }
+int recv_fd(int conn) {
+  struct msghdr msg;
+  struct iovec iov;
+  char buf[CMSG_SPACE(sizeof(int))];
+  init_msg(&msg, &iov, buf, sizeof(buf));
+  if (recvmsg(conn, &msg, 0) == -1) return -1;
+  int found_fd = -1;
+  int oh_noes = 0;
+  for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
+       header = CMSG_NXTHDR(&msg, header))
+    if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
+      ssize_t count =
+          (header->cmsg_len - (CMSG_DATA(header) - (unsigned char*)header)) / 
+      for (int i = 0; i < count; ++i) {
+        int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
+        if (found_fd == -1) {
+          found_fd = fd;
+        } else {
+          close(fd);
+          oh_noes = 1;
+        }
+      }
+    }
+  // The sender sent us more than one file descriptor. We've closed
+  // them all to prevent fd leaks but notify the caller that we got
+  // a bad message.
+  if (oh_noes) {
+    close(found_fd);
+    errno = EBADMSG;
+    return -1;
+  }
+  return found_fd;
diff --git a/cpp/src/plasma/fling.h b/cpp/src/plasma/fling.h
new file mode 100644
index 0000000..78ac9d1
--- /dev/null
+++ b/cpp/src/plasma/fling.h
@@ -0,0 +1,52 @@
+// Copyright 2013 Sharvil Nanavati
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// FLING: Exchanging file descriptors over sockets
+// This is a little library for sending file descriptors over a socket
+// between processes. The reason for doing that (as opposed to using
+// filenames to share the files) is so (a) no files remain in the
+// filesystem after all the processes terminate, (b) to make sure that
+// there are no name collisions and (c) to be able to control who has
+// access to the data.
+// Most of the code is from
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+// This is neccessary for Mac OS X, see
+// (10).
+#if !defined(CMSG_SPACE) && !defined(CMSG_LEN)
+#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + 
+#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len))
+void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t 
+// Send a file descriptor over a unix domain socket.
+// @param conn Unix domain socket to send the file descriptor over.
+// @param fd File descriptor to send over.
+// @return Status code which is < 0 on failure.
+int send_fd(int conn, int fd);
+// Receive a file descriptor over a unix domain socket.
+// @param conn Unix domain socket to receive the file descriptor from.
+// @return File descriptor or a value < 0 on failure.
+int recv_fd(int conn);
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
new file mode 100644
index 0000000..4d7d285
--- /dev/null
+++ b/cpp/src/plasma/format/common.fbs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// Object information data structure.
+table ObjectInfo {
+  // Object ID of this object.
+  object_id: string;
+  // Number of bytes the content of this object occupies in memory.
+  data_size: long;
+  // Number of bytes the metadata of this object occupies in memory.
+  metadata_size: long;
+  // Unix epoch of when this object was created.
+  create_time: long;
+  // How long creation of this object took.
+  construct_duration: long;
+  // Hash of the object content.
+  digest: string;
+  // Specifies if this object was deleted or added.
+  is_deletion: bool;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
new file mode 100644
index 0000000..23782ad
--- /dev/null
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// Plasma protocol specification
+enum MessageType:int {
+  // Create a new object.
+  PlasmaCreateRequest = 1,
+  PlasmaCreateReply,
+  // Seal an object.
+  PlasmaSealRequest,
+  PlasmaSealReply,
+  // Get an object that is stored on the local Plasma store.
+  PlasmaGetRequest,
+  PlasmaGetReply,
+  // Release an object.
+  PlasmaReleaseRequest,
+  PlasmaReleaseReply,
+  // Delete an object.
+  PlasmaDeleteRequest,
+  PlasmaDeleteReply,
+  // Get status of an object.
+  PlasmaStatusRequest,
+  PlasmaStatusReply,
+  // See if the store contains an object (will be deprecated).
+  PlasmaContainsRequest,
+  PlasmaContainsReply,
+  // Get information for a newly connecting client.
+  PlasmaConnectRequest,
+  PlasmaConnectReply,
+  // Make room for new objects in the plasma store.
+  PlasmaEvictRequest,
+  PlasmaEvictReply,
+  // Fetch objects from remote Plasma stores.
+  PlasmaFetchRequest,
+  // Wait for objects to be ready either from local or remote Plasma stores.
+  PlasmaWaitRequest,
+  PlasmaWaitReply,
+  // Subscribe to a list of objects or to all objects.
+  PlasmaSubscribeRequest,
+  // Unsubscribe.
+  PlasmaUnsubscribeRequest,
+  // Sending and receiving data.
+  // PlasmaDataRequest initiates sending the data, there will be one
+  // such message per data transfer.
+  PlasmaDataRequest,
+  // PlasmaDataReply contains the actual data and is sent back to the
+  // object store that requested the data. For each transfer, multiple
+  // reply messages get sent. Each one contains a fixed number of bytes.
+  PlasmaDataReply,
+  // Object notifications.
+  PlasmaNotification
+enum PlasmaError:int {
+  // Operation was successful.
+  OK,
+  // Trying to create an object that already exists.
+  ObjectExists,
+  // Trying to access an object that doesn't exist.
+  ObjectNonexistent,
+  // Trying to create an object but there isn't enough space in the store.
+  OutOfMemory
+// Plasma store messages
+struct PlasmaObjectSpec {
+  // Index of the memory segment (= memory mapped file) that
+  // this object is allocated in.
+  segment_index: int;
+  // Size in bytes of this segment (needed to call mmap).
+  mmap_size: ulong;
+  // The offset in bytes in the memory mapped file of the data.
+  data_offset: ulong;
+  // The size in bytes of the data.
+  data_size: ulong;
+  // The offset in bytes in the memory mapped file of the metadata.
+  metadata_offset: ulong;
+  // The size in bytes of the metadata.
+  metadata_size: ulong;
+table PlasmaCreateRequest {
+  // ID of the object to be created.
+  object_id: string;
+  // The size of the object's data in bytes.
+  data_size: ulong;
+  // The size of the object's metadata in bytes.
+  metadata_size: ulong;
+table PlasmaCreateReply {
+  // ID of the object that was created.
+  object_id: string;
+  // The object that is returned with this reply.
+  plasma_object: PlasmaObjectSpec;
+  // Error that occurred for this call.
+  error: PlasmaError;
+table PlasmaSealRequest {
+  // ID of the object to be sealed.
+  object_id: string;
+  // Hash of the object data.
+  digest: string;
+table PlasmaSealReply {
+  // ID of the object that was sealed.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+table PlasmaGetRequest {
+  // IDs of the objects stored at local Plasma store we are getting.
+  object_ids: [string];
+  // The number of milliseconds before the request should timeout.
+  timeout_ms: long;
+table PlasmaGetReply {
+  // IDs of the objects being returned.
+  // This number can be smaller than the number of requested
+  // objects if not all requested objects are stored and sealed
+  // in the local Plasma store.
+  object_ids: [string];
+  // Plasma object information, in the same order as their IDs.
+  plasma_objects: [PlasmaObjectSpec];
+  // The number of elements in both object_ids and plasma_objects arrays must 
+table PlasmaReleaseRequest {
+  // ID of the object to be released.
+  object_id: string;
+table PlasmaReleaseReply {
+  // ID of the object that was released.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+table PlasmaDeleteRequest {
+  // ID of the object to be deleted.
+  object_id: string;
+table PlasmaDeleteReply {
+  // ID of the object that was deleted.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+table PlasmaStatusRequest {
+  // IDs of the objects stored at local Plasma store we request the status of.
+  object_ids: [string];
+enum ObjectStatus:int {
+  // Object is stored in the local Plasma Store.
+  Local = 1,
+  // Object is stored on a remote Plasma store, and it is not stored on the
+  // local Plasma Store.
+  Remote,
+  // Object is not stored in the system.
+  Nonexistent,
+  // Object is currently transferred from a remote Plasma store the the local
+  // Plasma Store.
+  Transfer
+table PlasmaStatusReply {
+  // IDs of the objects being returned.
+  object_ids: [string];
+  // Status of the object.
+  status: [ObjectStatus];
+// PlasmaContains is a subset of PlasmaStatus which does not
+// involve the plasma manager, only the store. We should consider
+// unifying them in the future and deprecating PlasmaContains.
+table PlasmaContainsRequest {
+  // ID of the object we are querying.
+  object_id: string;
+table PlasmaContainsReply {
+  // ID of the object we are querying.
+  object_id: string;
+  // 1 if the object is in the store and 0 otherwise.
+  has_object: int;
+// PlasmaConnect is used by a plasma client the first time it connects with the
+// store. This is not really necessary, but is used to get some information
+// about the store such as its memory capacity.
+table PlasmaConnectRequest {
+table PlasmaConnectReply {
+  // The memory capacity of the store.
+  memory_capacity: long;
+table PlasmaEvictRequest {
+  // Number of bytes that shall be freed.
+  num_bytes: ulong;
+table PlasmaEvictReply {
+  // Number of bytes that have been freed.
+  num_bytes: ulong;
+table PlasmaFetchRequest {
+  // IDs of objects to be gotten.
+  object_ids: [string];
+table ObjectRequestSpec {
+  // ID of the object.
+  object_id: string;
+  // The type of the object. This specifies whether we
+  // will be waiting for an object store in the local or
+  // global Plasma store.
+  type: int;
+table PlasmaWaitRequest {
+  // Array of object requests whose status we are asking for.
+  object_requests: [ObjectRequestSpec];
+  // Number of objects expected to be returned, if available.
+  num_ready_objects: int;
+  // timeout
+  timeout: long;
+table ObjectReply {
+  // ID of the object.
+  object_id: string;
+  // The object status. This specifies where the object is stored.
+  status: int;
+table PlasmaWaitReply {
+  // Array of object requests being returned.
+  object_requests: [ObjectReply];
+  // Number of objects expected to be returned, if available.
+  num_ready_objects: int;
+table PlasmaSubscribeRequest {
+table PlasmaDataRequest {
+  // ID of the object that is requested.
+  object_id: string;
+  // The host address where the data shall be sent to.
+  address: string;
+  // The port of the manager the data shall be sent to.
+  port: int;
+table PlasmaDataReply {
+  // ID of the object that will be sent.
+  object_id: string;
+  // Size of the object data in bytes.
+  object_size: ulong;
+  // Size of the metadata in bytes.
+  metadata_size: ulong;
diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..5875ebb
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "plasma/io.h"
+#include "plasma/common.h"
+using arrow::Status;
+/* Number of times we try binding to a socket. */
+#define BIND_TIMEOUT_MS 100
+/* Number of times we try connecting to a socket. */
+Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
+  ssize_t nbytes = 0;
+  size_t bytesleft = length;
+  size_t offset = 0;
+  while (bytesleft > 0) {
+    /* While we haven't written the whole message, write to the file 
+     * advance the cursor, and decrease the amount left to write. */
+    nbytes = write(fd, cursor + offset, bytesleft);
+    if (nbytes < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 
continue; }
+      return Status::IOError(std::string(strerror(errno)));
+    } else if (nbytes == 0) {
+      return Status::IOError("Encountered unexpected EOF");
+    }
+    ARROW_CHECK(nbytes > 0);
+    bytesleft -= nbytes;
+    offset += nbytes;
+  }
+  return Status::OK();
+Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
+  int64_t version = PLASMA_PROTOCOL_VERSION;
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), 
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), 
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), 
+  return WriteBytes(fd, bytes, length * sizeof(char));
+Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
+  ssize_t nbytes = 0;
+  /* Termination condition: EOF or read 'length' bytes total. */
+  size_t bytesleft = length;
+  size_t offset = 0;
+  while (bytesleft > 0) {
+    nbytes = read(fd, cursor + offset, bytesleft);
+    if (nbytes < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { 
continue; }
+      return Status::IOError(std::string(strerror(errno)));
+    } else if (0 == nbytes) {
+      return Status::IOError("Encountered unexpected EOF");
+    }
+    ARROW_CHECK(nbytes > 0);
+    bytesleft -= nbytes;
+    offset += nbytes;
+  }
+  return Status::OK();
+Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
+  int64_t version;
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), 
+      *type = DISCONNECT_CLIENT);
+  ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
+  size_t length;
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), 
+      *type = DISCONNECT_CLIENT);
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), 
+      *type = DISCONNECT_CLIENT);
+  if (length > buffer->size()) { buffer->resize(length); }
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = 
+  return Status::OK();
+int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
+  struct sockaddr_un socket_address;
+  int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (socket_fd < 0) {
+    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
+    return -1;
+  }
+  /* Tell the system to allow the port to be reused. */
+  int on = 1;
+  if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, 
+          sizeof(on)) < 0) {
+    ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+  unlink(pathname.c_str());
+  memset(&socket_address, 0, sizeof(socket_address));
+  socket_address.sun_family = AF_UNIX;
+  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
+    ARROW_LOG(ERROR) << "Socket pathname is too long.";
+    close(socket_fd);
+    return -1;
+  }
+  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
+  if (bind(socket_fd, (struct sockaddr*)&socket_address, 
sizeof(socket_address)) != 0) {
+    ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+  if (shall_listen && listen(socket_fd, 128) == -1) {
+    ARROW_LOG(ERROR) << "Could not listen to socket " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+  return socket_fd;
+int connect_ipc_sock_retry(
+    const std::string& pathname, int num_retries, int64_t timeout) {
+  /* Pick the default values if the user did not specify. */
+  if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; }
+  if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; }
+  int fd = -1;
+  for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
+    fd = connect_ipc_sock(pathname);
+    if (fd >= 0) { break; }
+    if (num_attempts == 0) {
+      ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << 
+    }
+    /* Sleep for timeout milliseconds. */
+    usleep(static_cast<int>(timeout * 1000));
+  }
+  /* If we could not connect to the socket, exit. */
+  if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << 
pathname; }
+  return fd;
+int connect_ipc_sock(const std::string& pathname) {
+  struct sockaddr_un socket_address;
+  int socket_fd;
+  socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (socket_fd < 0) {
+    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
+    return -1;
+  }
+  memset(&socket_address, 0, sizeof(socket_address));
+  socket_address.sun_family = AF_UNIX;
+  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
+    ARROW_LOG(ERROR) << "Socket pathname is too long.";
+    return -1;
+  }
+  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
+  if (connect(socket_fd, (struct sockaddr*)&socket_address, 
sizeof(socket_address)) !=
+      0) {
+    close(socket_fd);
+    return -1;
+  }
+  return socket_fd;
+int AcceptClient(int socket_fd) {
+  int client_fd = accept(socket_fd, NULL, NULL);
+  if (client_fd < 0) {
+    ARROW_LOG(ERROR) << "Error reading from socket.";
+    return -1;
+  }
+  return client_fd;
+uint8_t* read_message_async(int sock) {
+  int64_t size;
+  Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), 
+  if (!s.ok()) {
+    /* The other side has closed the socket. */
+    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has 
+    close(sock);
+    return NULL;
+  }
+  uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
+  s = ReadBytes(sock, message, size);
+  if (!s.ok()) {
+    /* The other side has closed the socket. */
+    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has 
+    close(sock);
+    return NULL;
+  }
+  return message;
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
new file mode 100644
index 0000000..43c3fb5
--- /dev/null
+++ b/cpp/src/plasma/io.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef PLASMA_IO_H
+#define PLASMA_IO_H
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <string>
+#include <vector>
+#include "arrow/status.h"
+// TODO(pcm): Replace our own custom message header (message type,
+// message length, plasma protocol verion) with one that is serialized
+// using flatbuffers.
+#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
+arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
+arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* 
+arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
+int bind_ipc_sock(const std::string& pathname, bool shall_listen);
+int connect_ipc_sock(const std::string& pathname);
+int connect_ipc_sock_retry(const std::string& pathname, int num_retries, 
int64_t timeout);
+int AcceptClient(int socket_fd);
+uint8_t* read_message_async(int sock);
+#endif  // PLASMA_IO_H
diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..e7ffd1a
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "plasma/malloc.h"
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <unordered_map>
+#include "plasma/common.h"
+extern "C" {
+void* fake_mmap(size_t);
+int fake_munmap(void*, int64_t);
+#define MMAP(s) fake_mmap(s)
+#define MUNMAP(a, s) fake_munmap(a, s)
+#define DIRECT_MMAP(s) fake_mmap(s)
+#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
+#define USE_DL_PREFIX
+#define HAVE_MORECORE 0
+#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)
+#include "thirdparty/dlmalloc.c"
+#undef MMAP
+#undef MUNMAP
+struct mmap_record {
+  int fd;
+  int64_t size;
+namespace {
+/** Hashtable that contains one entry per segment that we got from the OS
+ *  via mmap. Associates the address of that segment with its file descriptor
+ *  and size. */
+std::unordered_map<void*, mmap_record> mmap_records;
+} /* namespace */
+constexpr int GRANULARITY_MULTIPLIER = 2;
+static void* pointer_advance(void* p, ptrdiff_t n) {
+  return (unsigned char*)p + n;
+static void* pointer_retreat(void* p, ptrdiff_t n) {
+  return (unsigned char*)p - n;
+static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) {
+  return (unsigned char const*)pto - (unsigned char const*)pfrom;
+/* Create a buffer. This is creating a temporary file and then
+ * immediately unlinking it so we do not leave traces in the system. */
+int create_buffer(int64_t size) {
+  int fd;
+#ifdef _WIN32
+          (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), 
+          NULL)) {
+    fd = -1;
+  }
+#ifdef __linux__
+  constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
+  constexpr char file_template[] = "/tmp/plasmaXXXXXX";
+  char file_name[32];
+  strncpy(file_name, file_template, 32);
+  fd = mkstemp(file_name);
+  if (fd < 0) return -1;
+  FILE* file = fdopen(fd, "a+");
+  if (!file) {
+    close(fd);
+    return -1;
+  }
+  if (unlink(file_name) != 0) {
+    ARROW_LOG(FATAL) << "unlink error";
+    return -1;
+  }
+  if (ftruncate(fd, (off_t)size) != 0) {
+    ARROW_LOG(FATAL) << "ftruncate error";
+    return -1;
+  }
+  return fd;
+void* fake_mmap(size_t size) {
+  /* Add sizeof(size_t) so that the returned pointer is deliberately not
+   * page-aligned. This ensures that the segments of memory returned by
+   * fake_mmap are never contiguous. */
+  size += sizeof(size_t);
+  int fd = create_buffer(size);
+  ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
+  void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+  if (pointer == MAP_FAILED) { return pointer; }
+  /* Increase dlmalloc's allocation granularity directly. */
+  mparams.granularity *= GRANULARITY_MULTIPLIER;
+  mmap_record& record = mmap_records[pointer];
+  record.fd = fd;
+  record.size = size;
+  /* We lie to dlmalloc about where mapped memory actually lives. */
+  pointer = pointer_advance(pointer, sizeof(size_t));
+  ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
+  return pointer;
+int fake_munmap(void* addr, int64_t size) {
+  ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
+  addr = pointer_retreat(addr, sizeof(size_t));
+  size += sizeof(size_t);
+  auto entry = mmap_records.find(addr);
+  if (entry == mmap_records.end() || entry->second.size != size) {
+    /* Reject requests to munmap that don't directly match previous
+     * calls to mmap, to prevent dlmalloc from trimming. */
+    return -1;
+  }
+  int r = munmap(addr, size);
+  if (r == 0) { close(entry->second.fd); }
+  mmap_records.erase(entry);
+  return r;
+void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* 
offset) {
+  /* TODO(rshin): Implement a more efficient search through mmap_records. */
+  for (const auto& entry : mmap_records) {
+    if (addr >= entry.first && addr < pointer_advance(entry.first, 
entry.second.size)) {
+      *fd = entry.second.fd;
+      *map_size = entry.second.size;
+      *offset = pointer_distance(entry.first, addr);
+      return;
+    }
+  }
+  *fd = -1;
+  *map_size = 0;
+  *offset = 0;
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
new file mode 100644
index 0000000..b4af2c8
--- /dev/null
+++ b/cpp/src/plasma/malloc.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <inttypes.h>
+#include <stddef.h>
+void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* 
+#endif  // MALLOC_H
diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..559d8e7
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "plasma/plasma.h"
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "plasma/common.h"
+#include "plasma/protocol.h"
+int warn_if_sigpipe(int status, int client_sock) {
+  if (status >= 0) { return 0; }
+  if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
+                          "sending a message to client on fd "
+                       << client_sock << ". The client on the other end may "
+                                         "have hung up.";
+    return errno;
+  }
+  ARROW_LOG(FATAL) << "Failed to write message to client on fd " << 
client_sock << ".";
+  return -1;  // This is never reached.
+ * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
+ * of this buffer are the length of the remaining message and the
+ * remaining message is a serialized version of the object info.
+ *
+ * @param object_info The object info to be serialized
+ * @return The object info buffer. It is the caller's responsibility to free
+ *         this buffer with "delete" after it has been used.
+ */
+uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreateObjectInfo(fbb, object_info);
+  fbb.Finish(message);
+  uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
+  *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
+  memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), 
+  return notification;
+ObjectTableEntry* get_object_table_entry(
+    PlasmaStoreInfo* store_info, const ObjectID& object_id) {
+  auto it = store_info->objects.find(object_id);
+  if (it == store_info->objects.end()) { return NULL; }
+  return it->second.get();
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
new file mode 100644
index 0000000..275d0c7
--- /dev/null
+++ b/cpp/src/plasma/plasma.h
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <errno.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>  // pid_t
+#include <unordered_map>
+#include <unordered_set>
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+#include "format/common_generated.h"
+#include "plasma/common.h"
+#define HANDLE_SIGPIPE(s, fd_)                                              \
+  do {                                                                      \
+    Status _s = (s);                                                        \
+    if (!_s.ok()) {                                                         \
+      if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {        \
+        ARROW_LOG(WARNING)                                                  \
+            << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
+               "sending a message to client on fd "                         \
+            << fd_ << ". "                                                  \
+                      "The client on the other end may have hung up.";      \
+      } else {                                                              \
+        return _s;                                                          \
+      }                                                                     \
+    }                                                                       \
+  } while (0);
+/// Allocation granularity used in plasma for object allocation.
+#define BLOCK_SIZE 64
+/// Size of object hash digests.
+constexpr int64_t kDigestSize = sizeof(uint64_t);
+struct Client;
+/// Object request data structure. Used in the plasma_wait_for_objects()
+/// argument.
+typedef struct {
+  /// The ID of the requested object. If ID_NIL request any object.
+  ObjectID object_id;
+  /// Request associated to the object. It can take one of the following 
+  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
+  ///    local Plasma Store.
+  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
+  ///    the system (i.e., either in the local or a remote Plasma Store).
+  int type;
+  /// Object status. Same as the status returned by plasma_status() function
+  /// call. This is filled in by plasma_wait_for_objects1():
+  ///  - ObjectStatus_Local: object is ready at the local Plasma Store.
+  ///  - ObjectStatus_Remote: object is ready at a remote Plasma Store.
+  ///  - ObjectStatus_Nonexistent: object does not exist in the system.
+  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
+  ///    for being transferred or it is transferring.
+  int status;
+} ObjectRequest;
+/// Mapping from object IDs to type and status of the request.
+typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> 
+/// Handle to access memory mapped file and map it into client address space.
+typedef struct {
+  /// The file descriptor of the memory mapped file in the store. It is used as
+  /// a unique identifier of the file in the client to look up the 
+  /// file descriptor on the client's side.
+  int store_fd;
+  /// The size in bytes of the memory mapped file.
+  int64_t mmap_size;
+} object_handle;
+// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
+typedef struct {
+  /// Handle for memory mapped file the object is stored in.
+  object_handle handle;
+  /// The offset in bytes in the memory mapped file of the data.
+  ptrdiff_t data_offset;
+  /// The offset in bytes in the memory mapped file of the metadata.
+  ptrdiff_t metadata_offset;
+  /// The size in bytes of the data.
+  int64_t data_size;
+  /// The size in bytes of the metadata.
+  int64_t metadata_size;
+} PlasmaObject;
+typedef enum {
+  /// Object was created but not sealed in the local Plasma Store.
+  /// Object is sealed and stored in the local Plasma Store.
+} object_state;
+typedef enum {
+  /// The object was not found.
+  /// The object was found.
+} object_status;
+typedef enum {
+  /// Query for object in the local plasma store.
+  /// Query for object in the local plasma store or in a remote plasma store.
+} object_request_type;
+/// This type is used by the Plasma store. It is here because it is exposed to
+/// the eviction policy.
+struct ObjectTableEntry {
+  /// Object id of this object.
+  ObjectID object_id;
+  /// Object info like size, creation time and owner.
+  ObjectInfoT info;
+  /// Memory mapped file containing the object.
+  int fd;
+  /// Size of the underlying map.
+  int64_t map_size;
+  /// Offset from the base of the mmap.
+  ptrdiff_t offset;
+  /// Pointer to the object data. Needed to free the object.
+  uint8_t* pointer;
+  /// Set of clients currently using this object.
+  std::unordered_set<Client*> clients;
+  /// The state of the object, e.g., whether it is open or sealed.
+  object_state state;
+  /// The digest of the object. Used to see if two objects are the same.
+  unsigned char digest[kDigestSize];
+/// The plasma store information that is exposed to the eviction policy.
+struct PlasmaStoreInfo {
+  /// Objects that are in the Plasma store.
+  std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>, 
UniqueIDHasher> objects;
+  /// The amount of memory (in bytes) that we allow to be allocated in the
+  /// store.
+  int64_t memory_capacity;
+/// Get an entry from the object table and return NULL if the object_id
+/// is not present.
+/// @param store_info The PlasmaStoreInfo that contains the object table.
+/// @param object_id The object_id of the entry we are looking for.
+/// @return The entry associated with the object_id or NULL if the object_id
+///         is not present.
+ObjectTableEntry* get_object_table_entry(
+    PlasmaStoreInfo* store_info, const ObjectID& object_id);
+/// Print a warning if the status is less than zero. This should be used to 
+/// the success of messages sent to plasma clients. We print a warning instead 
+/// failing because the plasma clients are allowed to die. This is used to 
+/// situations where the store writes to a client file descriptor, and the 
+/// may already have disconnected. If we have processed the disconnection and
+/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If 
+/// have not, then we should get a SIGPIPE. If we write to a TCP socket that
+/// isn't connected yet, then we should get an ECONNRESET.
+/// @param status The status to check. If it is less less than zero, we will
+///        print a warning.
+/// @param client_sock The client socket. This is just used to print some extra
+///        information.
+/// @return The errno set.
+int warn_if_sigpipe(int status, int client_sock);
+uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
+#endif  // PLASMA_PLASMA_H
diff --git a/cpp/src/plasma/ b/cpp/src/plasma/
new file mode 100644
index 0000000..246aa29
--- /dev/null
+++ b/cpp/src/plasma/
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "plasma/protocol.h"
+#include "flatbuffers/flatbuffers.h"
+#include "format/plasma_generated.h"
+#include "plasma/common.h"
+#include "plasma/io.h"
+using flatbuffers::uoffset_t;
+to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
+    int64_t num_objects) {
+  std::vector<flatbuffers::Offset<flatbuffers::String>> results;
+  for (int64_t i = 0; i < num_objects; i++) {
+    results.push_back(fbb->CreateString(object_ids[i].binary()));
+  }
+  return fbb->CreateVector(results);
+Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* 
buffer) {
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
+  ARROW_CHECK(type == message_type) << "type = " << type
+                                    << ", message_type = " << message_type;
+  return Status::OK();
+template <typename Message>
+Status PlasmaSend(int sock, int64_t message_type, 
flatbuffers::FlatBufferBuilder* fbb,
+    const Message& message) {
+  fbb->Finish(message);
+  return WriteMessage(sock, message_type, fbb->GetSize(), 
+// Create messages.
+Status SendCreateRequest(
+    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaCreateRequest(
+      fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size);
+  return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
+Status ReadCreateRequest(
+    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* 
metadata_size) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
+  *data_size = message->data_size();
+  *metadata_size = message->metadata_size();
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+Status SendCreateReply(
+    int sock, ObjectID object_id, PlasmaObject* object, int error_code) {
+  flatbuffers::FlatBufferBuilder fbb;
+  PlasmaObjectSpec plasma_object(object->handle.store_fd, 
+      object->data_offset, object->data_size, object->metadata_offset,
+      object->metadata_size);
+  auto message = CreatePlasmaCreateReply(
+      fbb, fbb.CreateString(object_id.binary()), &plasma_object, 
+  return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
+Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* 
object) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  object->handle.store_fd = message->plasma_object()->segment_index();
+  object->handle.mmap_size = message->plasma_object()->mmap_size();
+  object->data_offset = message->plasma_object()->data_offset();
+  object->data_size = message->plasma_object()->data_size();
+  object->metadata_offset = message->plasma_object()->metadata_offset();
+  object->metadata_size = message->plasma_object()->metadata_size();
+  return plasma_error_status(message->error());
+// Seal messages.
+Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), 
+  auto message =
+      CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), 
+  return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
+Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* 
digest) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  ARROW_CHECK(message->digest()->size() == kDigestSize);
+  memcpy(digest, message->digest()->data(), kDigestSize);
+  return Status::OK();
+Status SendSealReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSealReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
+Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+// Release messages.
+Status SendReleaseRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSealRequest(fbb, 
+  return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
+Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+Status SendReleaseReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaReleaseReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
+Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+// Delete messages.
+Status SendDeleteRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDeleteRequest(fbb, 
+  return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
+Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+Status SendDeleteReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDeleteReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
+Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+// Satus messages.
+Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t 
num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, 
+  return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
+Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t 
num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = 
+  }
+  return Status::OK();
+Status SendStatusReply(
+    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) 
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, 
+          fbb.CreateVector(object_status, num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
+int64_t ReadStatusReply_num_objects(uint8_t* data) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  return message->object_ids()->size();
+Status ReadStatusReply(
+    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t 
num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = 
+  }
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_status[i] = message->status()->data()[i];
+  }
+  return Status::OK();
+// Contains messages.
+Status SendContainsRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaContainsRequest(fbb, 
+  return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
+Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), 
+  return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
+Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) 
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *has_object = message->has_object();
+  return Status::OK();
+// Connect messages.
+Status SendConnectRequest(int sock) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaConnectRequest(fbb);
+  return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message);
+Status ReadConnectRequest(uint8_t* data) {
+  return Status::OK();
+Status SendConnectReply(int sock, int64_t memory_capacity) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
+  return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
+Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
+  *memory_capacity = message->memory_capacity();
+  return Status::OK();
+// Evict messages.
+Status SendEvictRequest(int sock, int64_t num_bytes) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
+  return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
+Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
+  *num_bytes = message->num_bytes();
+  return Status::OK();
+Status SendEvictReply(int sock, int64_t num_bytes) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaEvictReply(fbb, num_bytes);
+  return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
+Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
+  num_bytes = message->num_bytes();
+  return Status::OK();
+// Get messages.
+Status SendGetRequest(
+    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t 
timeout_ms) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaGetRequest(
+      fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms);
+  return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
+Status ReadGetRequest(
+    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
+  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+    auto object_id = message->object_ids()->Get(i)->str();
+    object_ids.push_back(ObjectID::from_binary(object_id));
+  }
+  *timeout_ms = message->timeout_ms();
+  return Status::OK();
+Status SendGetReply(int sock, ObjectID object_ids[],
+    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
+    int64_t num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<PlasmaObjectSpec> objects;
+  for (int i = 0; i < num_objects; ++i) {
+    const PlasmaObject& object = plasma_objects[object_ids[i]];
+    objects.push_back(PlasmaObjectSpec(object.handle.store_fd, 
+        object.data_offset, object.data_size, object.metadata_offset,
+        object.metadata_size));
+  }
+  auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, 
+      fbb.CreateVectorOfStructs(, num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
+Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject 
+    int64_t num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = 
+  }
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
+    plasma_objects[i].handle.store_fd = object->segment_index();
+    plasma_objects[i].handle.mmap_size = object->mmap_size();
+    plasma_objects[i].data_offset = object->data_offset();
+    plasma_objects[i].data_size = object->data_size();
+    plasma_objects[i].metadata_offset = object->metadata_offset();
+    plasma_objects[i].metadata_size = object->metadata_size();
+  }
+  return Status::OK();
+// Fetch messages.
+Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t 
num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, 
+  return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
+Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
+  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+  }
+  return Status::OK();
+// Wait messages.
+Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t 
+    int num_ready_objects, int64_t timeout_ms) {
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs;
+  for (int i = 0; i < num_requests; i++) {
+    object_request_specs.push_back(CreateObjectRequestSpec(fbb,
+        fbb.CreateString(object_requests[i].object_id.binary()),
+        object_requests[i].type));
+  }
+  auto message = CreatePlasmaWaitRequest(
+      fbb, fbb.CreateVector(object_request_specs), num_ready_objects, 
+  return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
+Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+    int64_t* timeout_ms, int* num_ready_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
+  *num_ready_objects = message->num_ready_objects();
+  *timeout_ms = message->timeout();
+  for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
+    ObjectID object_id =
+    ObjectRequest object_request({object_id, 
+        ObjectStatus_Nonexistent});
+    object_requests[object_id] = object_request;
+  }
+  return Status::OK();
+Status SendWaitReply(
+    int sock, const ObjectRequestMap& object_requests, int num_ready_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
+  for (const auto& entry : object_requests) {
+    const auto& object_request = entry.second;
+    object_replies.push_back(CreateObjectReply(
+        fbb, fbb.CreateString(object_request.object_id.binary()), 
+  }
+  auto message = CreatePlasmaWaitReply(
+      fbb, fbb.CreateVector(, num_ready_objects), 
+  return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
+Status ReadWaitReply(
+    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
+  *num_ready_objects = message->num_ready_objects();
+  for (int i = 0; i < *num_ready_objects; i++) {
+    object_requests[i].object_id =
+    object_requests[i].status = message->object_requests()->Get(i)->status();
+  }
+  return Status::OK();
+// Subscribe messages.
+Status SendSubscribeRequest(int sock) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSubscribeRequest(fbb);
+  return PlasmaSend(sock, MessageType_PlasmaSubscribeRequest, &fbb, message);
+// Data messages.
+Status SendDataRequest(int sock, ObjectID object_id, const char* address, int 
port) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto addr = fbb.CreateString(address, strlen(address));
+  auto message =
+      CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, 
+  return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
+Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, 
int* port) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
+  DCHECK(message->object_id()->size() == sizeof(ObjectID));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *address = strdup(message->address()->c_str());
+  *port = message->port();
+  return Status::OK();
+Status SendDataReply(
+    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDataReply(
+      fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size);
+  return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
+Status ReadDataReply(
+    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* 
metadata_size) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *object_size = (int64_t)message->object_size();
+  *metadata_size = (int64_t)message->metadata_size();
+  return Status::OK();
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
new file mode 100644
index 0000000..5d9d136
--- /dev/null
+++ b/cpp/src/plasma/protocol.h
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <vector>
+#include "arrow/status.h"
+#include "format/plasma_generated.h"
+#include "plasma/plasma.h"
+using arrow::Status;
+/* Plasma receive message. */
+Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* 
+/* Plasma Create message functions. */
+Status SendCreateRequest(
+    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size);
+Status ReadCreateRequest(
+    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* 
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int 
+Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* 
+/* Plasma Seal message functions. */
+Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
+Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* 
+Status SendSealReply(int sock, ObjectID object_id, int error);
+Status ReadSealReply(uint8_t* data, ObjectID* object_id);
+/* Plasma Get message functions. */
+Status SendGetRequest(
+    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t 
+Status ReadGetRequest(
+    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms);
+Status SendGetReply(int sock, ObjectID object_ids[],
+    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
+    int64_t num_objects);
+Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject 
+    int64_t num_objects);
+/* Plasma Release message functions. */
+Status SendReleaseRequest(int sock, ObjectID object_id);
+Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id);
+Status SendReleaseReply(int sock, ObjectID object_id, int error);
+Status ReadReleaseReply(uint8_t* data, ObjectID* object_id);
+/* Plasma Delete message functions. */
+Status SendDeleteRequest(int sock, ObjectID object_id);
+Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id);
+Status SendDeleteReply(int sock, ObjectID object_id, int error);
+Status ReadDeleteReply(uint8_t* data, ObjectID* object_id);
+/* Satus messages. */
+Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t 
+Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t 
+Status SendStatusReply(
+    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects);
+int64_t ReadStatusReply_num_objects(uint8_t* data);
+Status ReadStatusReply(
+    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t 
+/* Plasma Constains message functions. */
+Status SendContainsRequest(int sock, ObjectID object_id);
+Status ReadContainsRequest(uint8_t* data, ObjectID* object_id);
+Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
+Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object);
+/* Plasma Connect message functions. */
+Status SendConnectRequest(int sock);
+Status ReadConnectRequest(uint8_t* data);
+Status SendConnectReply(int sock, int64_t memory_capacity);
+Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity);
+/* Plasma Evict message functions (no reply so far). */
+Status SendEvictRequest(int sock, int64_t num_bytes);
+Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes);
+Status SendEvictReply(int sock, int64_t num_bytes);
+Status ReadEvictReply(uint8_t* data, int64_t& num_bytes);
+/* Plasma Fetch Remote message functions. */
+Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t 
+Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids);
+/* Plasma Wait message functions. */
+Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t 
+    int num_ready_objects, int64_t timeout_ms);
+Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+    int64_t* timeout_ms, int* num_ready_objects);
+Status SendWaitReply(
+    int sock, const ObjectRequestMap& object_requests, int num_ready_objects);
+Status ReadWaitReply(
+    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects);
+/* Plasma Subscribe message functions. */
+Status SendSubscribeRequest(int sock);
+/* Data messages. */
+Status SendDataRequest(int sock, ObjectID object_id, const char* address, int 
+Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, 
int* port);
+Status SendDataReply(
+    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size);
+Status ReadDataReply(
+    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* 
+#endif /* PLASMA_PROTOCOL */

Reply via email to