[
https://issues.apache.org/jira/browse/ARROW-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256525#comment-16256525
]
ASF GitHub Bot commented on ARROW-1795:
---------------------------------------
pcmoritz closed pull request #1327: ARROW-1795: [Plasma] Create flag to make
Plasma store use a single memory-mapped file.
URL: https://github.com/apache/arrow/pull/1327
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 7094aed6f..c6a19a547 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -676,12 +676,22 @@ class PlasmaStoreRunner {
PlasmaStoreRunner() {}
void Start(char* socket_name, int64_t system_memory, std::string directory,
- bool hugepages_enabled) {
+ bool hugepages_enabled, bool use_one_memory_mapped_file) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory,
hugepages_enabled));
plasma_config = store_->get_plasma_store_info();
+
+ // If the store is configured to use a single memory-mapped file, then we
+ // achieve that by mallocing and freeing a single large amount of space.
+ // that maximum allowed size up front.
+ if (use_one_memory_mapped_file) {
+ void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
+ ARROW_CHECK(pointer != NULL);
+ plasma::dlfree(pointer);
+ }
+
int socket = bind_ipc_sock(socket_name, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
@@ -716,14 +726,15 @@ void HandleSignal(int signal) {
}
void start_server(char* socket_name, int64_t system_memory, std::string
plasma_directory,
- bool hugepages_enabled) {
+ bool hugepages_enabled, bool use_one_memory_mapped_file) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
- g_runner->Start(socket_name, system_memory, plasma_directory,
hugepages_enabled);
+ g_runner->Start(socket_name, system_memory, plasma_directory,
hugepages_enabled,
+ use_one_memory_mapped_file);
}
} // namespace plasma
@@ -733,9 +744,11 @@ int main(int argc, char* argv[]) {
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
bool hugepages_enabled = false;
+ // True if a single large memory-mapped file should be created at startup.
+ bool use_one_memory_mapped_file = false;
int64_t system_memory = -1;
int c;
- while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
+ while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
switch (c) {
case 'd':
plasma_directory = std::string(optarg);
@@ -755,6 +768,9 @@ int main(int argc, char* argv[]) {
<< "GB of memory.";
break;
}
+ case 'f':
+ use_one_memory_mapped_file = true;
+ break;
default:
exit(-1);
}
@@ -808,5 +824,6 @@ int main(int argc, char* argv[]) {
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
- plasma::start_server(socket_name, system_memory, plasma_directory,
hugepages_enabled);
+ plasma::start_server(socket_name, system_memory, plasma_directory,
hugepages_enabled,
+ use_one_memory_mapped_file);
}
diff --git a/python/pyarrow/tests/test_plasma.py
b/python/pyarrow/tests/test_plasma.py
index b73d92d14..b28bd60c4 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -102,7 +102,8 @@ def assert_get_object_equal(unit_test, client1, client2,
object_id,
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False, use_profiler=False,
- stdout_file=None, stderr_file=None):
+ stdout_file=None, stderr_file=None,
+ use_one_memory_mapped_file=False):
"""Start a plasma store process.
Args:
use_valgrind (bool): True if the plasma store should be started inside
@@ -113,6 +114,8 @@ def
start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
+ use_one_memory_mapped_file: If True, then the store will use only a
+ single memory-mapped file.
Return:
A tuple of the name of the plasma store socket and the process ID of
the plasma store process.
@@ -124,6 +127,8 @@ def
start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
+ if use_one_memory_mapped_file:
+ command += ["-f"]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
@@ -147,10 +152,14 @@ def
start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
class TestPlasmaClient(object):
def setup_method(self, test_method):
+ use_one_memory_mapped_file = (test_method ==
+ self.test_use_one_memory_mapped_file)
+
import pyarrow.plasma as plasma
# Start Plasma store.
plasma_store_name, self.p = start_plasma_store(
- use_valgrind=os.getenv("PLASMA_VALGRIND") == "1")
+ use_valgrind=os.getenv("PLASMA_VALGRIND") == "1",
+ use_one_memory_mapped_file=use_one_memory_mapped_file)
# Connect to Plasma.
self.plasma_client = plasma.connect(plasma_store_name, "", 64)
# For the eviction test
@@ -720,3 +729,19 @@ def test_subscribe_deletions(self):
assert object_ids[i] == recv_objid
assert -1 == recv_dsize
assert -1 == recv_msize
+
+ def test_use_one_memory_mapped_file(self):
+ # Fill the object store up with a large number of small objects and let
+ # them go out of scope.
+ for _ in range(100):
+ create_object(
+ self.plasma_client,
+ np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
+ # Create large objects that require the full object store size, and
+ # verify that they fit.
+ for _ in range(2):
+ create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY, 0)
+ # Verify that an object that is too large does not fit.
+ with pytest.raises(pa.lib.PlasmaStoreFull):
+ create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY + 1,
+ 0)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [Plasma C++] change evict policy
> --------------------------------
>
> Key: ARROW-1795
> URL: https://issues.apache.org/jira/browse/ARROW-1795
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Plasma (C++)
> Reporter: Lu Qi
> Assignee: Lu Qi
> Priority: Minor
> Labels: pull-request-available
>
> case 1.say, we have total free memory 8 G , we have input 5G data, then comes
> another 6G data,
> if we choose to evict space 6G , it will throw exception saying that
> no object can be free. This is because we didn't count the 3G remaining free
> space .If we count this remaining 3G , we need to ask only 3G,thus
> we can evict the 5G data and we are still alive .
> case 2. another situation is : if we have free memory 10G , we input 1.5G
> data ,then comes another
> 9G data , if we use 10*20% = 2G data to evict ,then we will crash . In this
> situation we need to
> use 9+1.5-10 = 0.5G data to evict
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)