zhreshold closed pull request #13318: Improving multi-processing reliability 
for gluon DataLoader
URL: https://github.com/apache/incubator-mxnet/pull/13318
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/mxnet/gluon/data/dataloader.py 
b/python/mxnet/gluon/data/dataloader.py
index 50e2ad9f784..86cb835f512 100644
--- a/python/mxnet/gluon/data/dataloader.py
+++ b/python/mxnet/gluon/data/dataloader.py
@@ -189,7 +189,7 @@ def worker_loop(dataset, key_queue, data_queue, 
batchify_fn):
         batch = batchify_fn([dataset[i] for i in samples])
         data_queue.put((idx, batch))
 
-def fetcher_loop(data_queue, data_buffer, pin_memory=False):
+def fetcher_loop(data_queue, data_buffer, pin_memory=False, 
data_buffer_lock=None):
     """Fetcher loop for fetching data from queue and put in reorder dict."""
     while True:
         idx, batch = data_queue.get()
@@ -199,7 +199,11 @@ def fetcher_loop(data_queue, data_buffer, 
pin_memory=False):
             batch = _as_in_context(batch, context.cpu_pinned())
         else:
             batch = _as_in_context(batch, context.cpu())
-        data_buffer[idx] = batch
+        if data_buffer_lock is not None:
+            with data_buffer_lock:
+                data_buffer[idx] = batch
+        else:
+            data_buffer[idx] = batch
 
 
 class _MultiWorkerIter(object):
@@ -213,7 +217,10 @@ def __init__(self, num_workers, dataset, batchify_fn, 
batch_sampler, pin_memory=
         self._batch_sampler = batch_sampler
         self._key_queue = Queue()
         self._data_queue = Queue() if sys.version_info[0] <= 2 else 
SimpleQueue()
+
         self._data_buffer = {}
+        self._data_buffer_lock = threading.Lock()
+
         self._rcvd_idx = 0
         self._sent_idx = 0
         self._iter = iter(self._batch_sampler)
@@ -227,10 +234,11 @@ def __init__(self, num_workers, dataset, batchify_fn, 
batch_sampler, pin_memory=
             worker.daemon = True
             worker.start()
             workers.append(worker)
+        self._workers = workers
 
         self._fetcher = threading.Thread(
             target=fetcher_loop,
-            args=(self._data_queue, self._data_buffer, pin_memory))
+            args=(self._data_queue, self._data_buffer, pin_memory, 
self._data_buffer_lock))
         self._fetcher.daemon = True
         self._fetcher.start()
 
@@ -261,7 +269,8 @@ def __next__(self):
 
         while True:
             if self._rcvd_idx in self._data_buffer:
-                batch = self._data_buffer.pop(self._rcvd_idx)
+                with self._data_buffer_lock:
+                    batch = self._data_buffer.pop(self._rcvd_idx)
                 self._rcvd_idx += 1
                 self._push_next()
                 return batch
@@ -275,9 +284,18 @@ def __iter__(self):
     def shutdown(self):
         """Shutdown internal workers by pushing terminate signals."""
         if not self._shutdown:
+            # send shutdown signal to the fetcher and join data queue first
+            # Remark:   loop_fetcher need to be joined prior to the workers.
+            #           otherwise, the the fetcher may fail at getting data
+            self._data_queue.put((None, None))
+            self._fetcher.join()
+            # send shutdown signal to all worker processes
             for _ in range(self._num_workers):
                 self._key_queue.put((None, None))
-            self._data_queue.put((None, None))
+            # force shut down any alive worker processes
+            for w in self._workers:
+                if w.is_alive():
+                    w.terminate()
             self._shutdown = True
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to