This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a58bd72  ARROW-2272: [Python] Clean up leftovers in test_plasma.py
a58bd72 is described below

commit a58bd723c3cd99eca78229491a1649f2777a45a7
Author: Antoine Pitrou <anto...@python.org>
AuthorDate: Tue Mar 6 20:22:32 2018 +0100

    ARROW-2272: [Python] Clean up leftovers in test_plasma.py
    
    Author: Antoine Pitrou <anto...@python.org>
    
    Closes #1715 from pitrou/ARROW-2272-test-plasma-tmp and squashes the 
following commits:
    
    6d6d084 <Antoine Pitrou> ARROW-2272:  Clean up leftovers in test_plasma.py
---
 python/pyarrow/tests/test_plasma.py | 114 +++++++++++++++++++++---------------
 1 file changed, 68 insertions(+), 46 deletions(-)

diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 0df627f..b4e8649 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,14 +19,17 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
-import numpy as np
+import contextlib
 import os
 import pytest
 import random
+import shutil
 import signal
 import subprocess
+import tempfile
 import time
 
+import numpy as np
 import pyarrow as pa
 import pandas as pd
 
@@ -102,9 +105,9 @@ def assert_get_object_equal(unit_test, client1, client2, 
object_id,
         assert plasma.buffers_equal(metadata, client1_metadata)
 
 
+@contextlib.contextmanager
 def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
                        use_valgrind=False, use_profiler=False,
-                       stdout_file=None, stderr_file=None,
                        use_one_memory_mapped_file=False,
                        plasma_directory=None, use_hugepages=False):
     """Start a plasma store process.
@@ -125,34 +128,53 @@ def 
start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
     """
     if use_valgrind and use_profiler:
         raise Exception("Cannot use valgrind and profiler at the same time.")
-    plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
-    plasma_store_name = "/tmp/plasma_store{}".format(random_name())
-    command = [plasma_store_executable,
-               "-s", plasma_store_name,
-               "-m", str(plasma_store_memory)]
-    if use_one_memory_mapped_file:
-        command += ["-f"]
-    if plasma_directory:
-        command += ["-d", plasma_directory]
-    if use_hugepages:
-        command += ["-h"]
-    if use_valgrind:
-        pid = subprocess.Popen(["valgrind",
-                                "--track-origins=yes",
-                                "--leak-check=full",
-                                "--show-leak-kinds=all",
-                                "--leak-check-heuristics=stdstring",
-                                "--error-exitcode=1"] + command,
-                               stdout=stdout_file, stderr=stderr_file)
-        time.sleep(1.0)
-    elif use_profiler:
-        pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
-                               stdout=stdout_file, stderr=stderr_file)
-        time.sleep(1.0)
-    else:
-        pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
-        time.sleep(0.1)
-    return plasma_store_name, pid
+
+    tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
+    try:
+        plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
+        plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
+        command = [plasma_store_executable,
+                   "-s", plasma_store_name,
+                   "-m", str(plasma_store_memory)]
+        if use_one_memory_mapped_file:
+            command += ["-f"]
+        if plasma_directory:
+            command += ["-d", plasma_directory]
+        if use_hugepages:
+            command += ["-h"]
+        stdout_file = None
+        stderr_file = None
+        if use_valgrind:
+            command = ["valgrind",
+                       "--track-origins=yes",
+                       "--leak-check=full",
+                       "--show-leak-kinds=all",
+                       "--leak-check-heuristics=stdstring",
+                       "--error-exitcode=1"] + command
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(1.0)
+        elif use_profiler:
+            command = ["valgrind", "--tool=callgrind"] + command
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(1.0)
+        else:
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(0.1)
+        rc = proc.poll()
+        if rc is not None:
+            err = proc.stderr.read().decode()
+            raise RuntimeError("plasma_store exited unexpectedly with "
+                               "code %d. Error output follows:\n%s\n"
+                               % (rc, err))
+
+        yield plasma_store_name, proc
+    finally:
+        if proc.poll() is None:
+            proc.kill()
+        shutil.rmtree(tmpdir)
 
 
 @pytest.mark.plasma
@@ -164,25 +186,26 @@ class TestPlasmaClient(object):
 
         import pyarrow.plasma as plasma
         # Start Plasma store.
-        plasma_store_name, self.p = start_plasma_store(
+        self.plasma_store_ctx = start_plasma_store(
             use_valgrind=USE_VALGRIND,
             use_one_memory_mapped_file=use_one_memory_mapped_file)
+        plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
         # Connect to Plasma.
         self.plasma_client = plasma.connect(plasma_store_name, "", 64)
         # For the eviction test
         self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)
 
     def teardown_method(self, test_method):
-        # Check that the Plasma store is still alive.
-        assert self.p.poll() is None
-        # Kill the plasma store process.
-        if os.getenv("PLASMA_VALGRIND") == "1":
-            self.p.send_signal(signal.SIGTERM)
-            self.p.wait()
-            if self.p.returncode != 0:
-                assert False
-        else:
-            self.p.kill()
+        try:
+            # Check that the Plasma store is still alive.
+            assert self.p.poll() is None
+            # Ensure Valgrind detected no issues
+            if USE_VALGRIND:
+                self.p.send_signal(signal.SIGTERM)
+                self.p.wait()
+                assert self.p.returncode == 0
+        finally:
+            self.plasma_store_ctx.__exit__(None, None, None)
 
     def test_connection_failure_raises_exception(self):
         import pyarrow.plasma as plasma
@@ -773,8 +796,7 @@ def test_object_id_size():
                     reason="requires hugepage support")
 def test_use_huge_pages():
     import pyarrow.plasma as plasma
-    plasma_store_name, p = start_plasma_store(
-        plasma_directory="/mnt/hugepages", use_hugepages=True)
-    plasma_client = plasma.connect(plasma_store_name, "", 64)
-    create_object(plasma_client, 100000000)
-    p.kill()
+    with start_plasma_store(plasma_directory="/mnt/hugepages",
+                            use_hugepages=True) as (plasma_store_name, p):
+        plasma_client = plasma.connect(plasma_store_name, "", 64)
+        create_object(plasma_client, 100000000)

-- 
To stop receiving notification emails like this one, please contact
u...@apache.org.

Reply via email to