This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch maint-0.14.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit a7a39ee09c82a692037aa7ac4cd9ab0b8b63ab57 Author: Richard Liaw <[email protected]> AuthorDate: Tue Jul 16 00:30:50 2019 -0700 ARROW-5560: [C++][Plasma] Cannot create Plasma object after OutOfMemory error If the client tries to call `CreateObject` and there is not enough memory left in the object store to create it, an `OutOfMemory` error will be returned. However, the plasma store also creates an entry for the object, even though it failed to be created. This means that later on, if the client tries to create the object again, it will receive an error that the object already exists. Also, if the client tries to get the object, it will hang because the entry appears to be unsealed. We fix this by only creating the object entry if the `CreateObject` operation succeeds. Author: Richard Liaw <[email protected]> Author: Philipp Moritz <[email protected]> Closes #4850 from richardliaw/fix_all_plasma_bugs_once_and_for_all and squashes the following commits: 82c4701ca <Philipp Moritz> Update test_plasma.py 5ac61b8f3 <Richard Liaw> updatetest c6b97b345 <Richard Liaw> Merge branch 'fix_all_plasma_bugs_once_and_for_all' of https://github.com/richardliaw/arrow into fix_all_plasma_bugs_once_and_for_all 475d12d5c <Richard Liaw> fix up tests f5b892ff0 <Richard Liaw> Update cpp/src/plasma/store.cc ff1c826c4 <Richard Liaw> store fix --- cpp/src/plasma/store.cc | 38 +++++++++++++++++++++---------------- python/pyarrow/tests/test_plasma.py | 10 +++++++--- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c574d09..2c3361e 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -215,10 +215,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // ignore this requst. return PlasmaError::ObjectExists; } - auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); - entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); - entry->data_size = data_size; - entry->metadata_size = metadata_size; int fd = -1; int64_t map_size = 0; @@ -226,29 +222,35 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si uint8_t* pointer = nullptr; auto total_size = data_size + metadata_size; - if (device_num != 0) { + if (device_num == 0) { + pointer = AllocateMemory(total_size, &fd, &map_size, &offset); + if (!pointer) { + ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() + << ", data_size=" << data_size + << ", metadata_size=" << metadata_size + << ", will send a reply of PlasmaError::OutOfMemory"; + return PlasmaError::OutOfMemory; + } + } else { #ifdef PLASMA_CUDA - auto st = AllocateCudaMemory(device_num, total_size, &pointer, &entry->ipc_handle); + /// IPC GPU handle to share with clients. + std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; + auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle); if (!st.ok()) { ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); return PlasmaError::OutOfMemory; } - result->ipc_handle = entry->ipc_handle; + result->ipc_handle = ipc_handle; #else ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; return PlasmaError::OutOfMemory; #endif - } else { - pointer = AllocateMemory(total_size, &fd, &map_size, &offset); - if (!pointer) { - ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() - << ", data_size=" << data_size - << ", metadata_size=" << metadata_size - << ", will send a reply of PlasmaError::OutOfMemory"; - return PlasmaError::OutOfMemory; - } } + auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); + entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); + entry->data_size = data_size; + entry->metadata_size = metadata_size; entry->pointer = pointer; // TODO(pcm): Set the other fields. entry->fd = fd; @@ -259,6 +261,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si entry->create_time = std::time(nullptr); entry->construct_duration = -1; +#ifdef PLASMA_CUDA + entry->ipc_handle = result->ipc_handle; +#endif + result->store_fd = fd; result->data_offset = offset; result->metadata_offset = offset + data_size; diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 149bdd5..6360252 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -852,9 +852,13 @@ class TestPlasmaClient(object): for _ in range(2): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) # Verify that an object that is too large does not fit. - with pytest.raises(pa.lib.PlasmaStoreFull): - create_object(self.plasma_client2, - DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0) + # Also verifies that the right error is thrown, and does not + # create the object ID prematurely. + object_id = random_object_id() + for i in range(3): + with pytest.raises(pa.plasma.PlasmaStoreFull): + self.plasma_client2.create( + object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE) def test_client_death_during_get(self): import pyarrow.plasma as plasma
