This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-0.14.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit a7a39ee09c82a692037aa7ac4cd9ab0b8b63ab57
Author: Richard Liaw <[email protected]>
AuthorDate: Tue Jul 16 00:30:50 2019 -0700

    ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory 
error
    
    If the client tries to call `CreateObject` and there is not enough memory 
left in the object store to create it, an `OutOfMemory` error will be returned.
    
    However, the plasma store also creates an entry for the object, even though 
it failed to be created. This means that later on, if the client tries to 
create the object again, it will receive an error that the object already 
exists. Also, if the client tries to get the object, it will hang because the 
entry appears to be unsealed.
    
    We fix this by only creating the object entry if the `CreateObject` 
operation succeeds.
    
    Author: Richard Liaw <[email protected]>
    Author: Philipp Moritz <[email protected]>
    
    Closes #4850 from richardliaw/fix_all_plasma_bugs_once_and_for_all and 
squashes the following commits:
    
    82c4701ca <Philipp Moritz> Update test_plasma.py
    5ac61b8f3 <Richard Liaw> updatetest
    c6b97b345 <Richard Liaw> Merge branch 
'fix_all_plasma_bugs_once_and_for_all' of https://github.com/richardliaw/arrow 
into fix_all_plasma_bugs_once_and_for_all
    475d12d5c <Richard Liaw> fix up tests
    f5b892ff0 <Richard Liaw> Update cpp/src/plasma/store.cc
    ff1c826c4 <Richard Liaw> store fix
---
 cpp/src/plasma/store.cc             | 38 +++++++++++++++++++++----------------
 python/pyarrow/tests/test_plasma.py | 10 +++++++---
 2 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c574d09..2c3361e 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -215,10 +215,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
     // ignore this requst.
     return PlasmaError::ObjectExists;
   }
-  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-  entry = store_info_.objects.emplace(object_id, 
std::move(ptr)).first->second.get();
-  entry->data_size = data_size;
-  entry->metadata_size = metadata_size;
 
   int fd = -1;
   int64_t map_size = 0;
@@ -226,29 +222,35 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
   uint8_t* pointer = nullptr;
   auto total_size = data_size + metadata_size;
 
-  if (device_num != 0) {
+  if (device_num == 0) {
+    pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
+    if (!pointer) {
+      ARROW_LOG(ERROR) << "Not enough memory to create the object " << 
object_id.hex()
+                       << ", data_size=" << data_size
+                       << ", metadata_size=" << metadata_size
+                       << ", will send a reply of PlasmaError::OutOfMemory";
+      return PlasmaError::OutOfMemory;
+    }
+  } else {
 #ifdef PLASMA_CUDA
-    auto st = AllocateCudaMemory(device_num, total_size, &pointer, 
&entry->ipc_handle);
+    /// IPC GPU handle to share with clients.
+    std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
+    auto st = AllocateCudaMemory(device_num, total_size, &pointer, 
&ipc_handle);
     if (!st.ok()) {
       ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
       return PlasmaError::OutOfMemory;
     }
-    result->ipc_handle = entry->ipc_handle;
+    result->ipc_handle = ipc_handle;
 #else
     ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
     return PlasmaError::OutOfMemory;
 #endif
-  } else {
-    pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
-    if (!pointer) {
-      ARROW_LOG(ERROR) << "Not enough memory to create the object " << 
object_id.hex()
-                       << ", data_size=" << data_size
-                       << ", metadata_size=" << metadata_size
-                       << ", will send a reply of PlasmaError::OutOfMemory";
-      return PlasmaError::OutOfMemory;
-    }
   }
 
+  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+  entry = store_info_.objects.emplace(object_id, 
std::move(ptr)).first->second.get();
+  entry->data_size = data_size;
+  entry->metadata_size = metadata_size;
   entry->pointer = pointer;
   // TODO(pcm): Set the other fields.
   entry->fd = fd;
@@ -259,6 +261,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
   entry->create_time = std::time(nullptr);
   entry->construct_duration = -1;
 
+#ifdef PLASMA_CUDA
+  entry->ipc_handle = result->ipc_handle;
+#endif
+
   result->store_fd = fd;
   result->data_offset = offset;
   result->metadata_offset = offset + data_size;
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 149bdd5..6360252 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -852,9 +852,13 @@ class TestPlasmaClient(object):
         for _ in range(2):
             create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
         # Verify that an object that is too large does not fit.
-        with pytest.raises(pa.lib.PlasmaStoreFull):
-            create_object(self.plasma_client2,
-                          DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)
+        # Also verifies that the right error is thrown, and does not
+        # create the object ID prematurely.
+        object_id = random_object_id()
+        for i in range(3):
+            with pytest.raises(pa.plasma.PlasmaStoreFull):
+                self.plasma_client2.create(
+                    object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE)
 
     def test_client_death_during_get(self):
         import pyarrow.plasma as plasma

Reply via email to