This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f976629  ARROW-5560: [C++][Plasma] Cannot create Plasma object after 
OutOfMemory error
f976629 is described below

commit f976629a54f5518f6285a311c45c5957281b1ee7
Author: Richard Liaw <[email protected]>
AuthorDate: Tue Jul 16 00:30:50 2019 -0700

    ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory 
error
    
    If the client tries to call `CreateObject` and there is not enough memory 
left in the object store to create it, an `OutOfMemory` error will be returned.
    
    However, the plasma store also creates an entry for the object, even though 
it failed to be created. This means that later on, if the client tries to 
create the object again, it will receive an error that the object already 
exists. Also, if the client tries to get the object, it will hang because the 
entry appears to be unsealed.
    
    We fix this by only creating the object entry if the `CreateObject` 
operation succeeds.
    
    Author: Richard Liaw <[email protected]>
    Author: Philipp Moritz <[email protected]>
    
    Closes #4850 from richardliaw/fix_all_plasma_bugs_once_and_for_all and 
squashes the following commits:
    
    82c4701ca <Philipp Moritz> Update test_plasma.py
    5ac61b8f3 <Richard Liaw> updatetest
    c6b97b345 <Richard Liaw> Merge branch 
'fix_all_plasma_bugs_once_and_for_all' of https://github.com/richardliaw/arrow 
into fix_all_plasma_bugs_once_and_for_all
    475d12d5c <Richard Liaw> fix up tests
    f5b892ff0 <Richard Liaw> Update cpp/src/plasma/store.cc
    ff1c826c4 <Richard Liaw> store fix
---
 cpp/src/plasma/store.cc             | 38 +++++++++++++++++++++----------------
 python/pyarrow/tests/test_plasma.py | 10 +++++++---
 2 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c574d09..2c3361e 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -215,10 +215,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
     // ignore this requst.
     return PlasmaError::ObjectExists;
   }
-  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-  entry = store_info_.objects.emplace(object_id, 
std::move(ptr)).first->second.get();
-  entry->data_size = data_size;
-  entry->metadata_size = metadata_size;
 
   int fd = -1;
   int64_t map_size = 0;
@@ -226,29 +222,35 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
   uint8_t* pointer = nullptr;
   auto total_size = data_size + metadata_size;
 
-  if (device_num != 0) {
+  if (device_num == 0) {
+    pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
+    if (!pointer) {
+      ARROW_LOG(ERROR) << "Not enough memory to create the object " << 
object_id.hex()
+                       << ", data_size=" << data_size
+                       << ", metadata_size=" << metadata_size
+                       << ", will send a reply of PlasmaError::OutOfMemory";
+      return PlasmaError::OutOfMemory;
+    }
+  } else {
 #ifdef PLASMA_CUDA
-    auto st = AllocateCudaMemory(device_num, total_size, &pointer, 
&entry->ipc_handle);
+    /// IPC GPU handle to share with clients.
+    std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
+    auto st = AllocateCudaMemory(device_num, total_size, &pointer, 
&ipc_handle);
     if (!st.ok()) {
       ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
       return PlasmaError::OutOfMemory;
     }
-    result->ipc_handle = entry->ipc_handle;
+    result->ipc_handle = ipc_handle;
 #else
     ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
     return PlasmaError::OutOfMemory;
 #endif
-  } else {
-    pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
-    if (!pointer) {
-      ARROW_LOG(ERROR) << "Not enough memory to create the object " << 
object_id.hex()
-                       << ", data_size=" << data_size
-                       << ", metadata_size=" << metadata_size
-                       << ", will send a reply of PlasmaError::OutOfMemory";
-      return PlasmaError::OutOfMemory;
-    }
   }
 
+  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+  entry = store_info_.objects.emplace(object_id, 
std::move(ptr)).first->second.get();
+  entry->data_size = data_size;
+  entry->metadata_size = metadata_size;
   entry->pointer = pointer;
   // TODO(pcm): Set the other fields.
   entry->fd = fd;
@@ -259,6 +261,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
   entry->create_time = std::time(nullptr);
   entry->construct_duration = -1;
 
+#ifdef PLASMA_CUDA
+  entry->ipc_handle = result->ipc_handle;
+#endif
+
   result->store_fd = fd;
   result->data_offset = offset;
   result->metadata_offset = offset + data_size;
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 49808a1..5381c26 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -852,9 +852,13 @@ class TestPlasmaClient(object):
         for _ in range(2):
             create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
         # Verify that an object that is too large does not fit.
-        with pytest.raises(pa.plasma.PlasmaStoreFull):
-            create_object(self.plasma_client2,
-                          DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)
+        # Also verifies that the right error is thrown, and does not
+        # create the object ID prematurely.
+        object_id = random_object_id()
+        for i in range(3):
+            with pytest.raises(pa.plasma.PlasmaStoreFull):
+                self.plasma_client2.create(
+                    object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE)
 
     def test_client_death_during_get(self):
         import pyarrow.plasma as plasma

Reply via email to