This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch aevri/win32_minimal_seemstowork_20190829 in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit b852196b3ef696da62242379160c9c1e387cab16 Author: Angelos Evripiotis <[email protected]> AuthorDate: Tue Jul 9 13:00:15 2019 +0100 TEMP: testpickle --- src/buildstream/_scheduler/jobs/jobpickler.py | 5 + src/buildstream/testpickle.py | 224 ++++++++++++++++++++++++++ 2 files changed, 229 insertions(+) diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index 3ce2806..2e0ee5c 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -140,6 +140,11 @@ def pickle_child_job(child_job, context): pickler.dispatch_table[Loader] = _reduce_loader pickler.dispatch_table[Messenger] = _reduce_messenger + import buildstream.testpickle + test_pickler = buildstream.testpickle.TestPickler() + test_pickler.dispatch_table = pickler.dispatch_table.copy() + test_pickler.test_dump(child_job) + pickler.dump(child_job) data.seek(0) diff --git a/src/buildstream/testpickle.py b/src/buildstream/testpickle.py new file mode 100644 index 0000000..52f351b --- /dev/null +++ b/src/buildstream/testpickle.py @@ -0,0 +1,224 @@ +import copyreg +import io +import pickle + + +class _C: + def f(self): + pass + + +class TestPickler: + def __init__(self): + self.dispatch_table = copyreg.dispatch_table.copy() + self.visited = set() + + def test_dump(self, obj): + import bdb + try: + self._test_pickle(obj) + except bdb.BdbQuit: + raise + except Exception: + breakpoint() + raise + + def _pickle(self, obj): + data = io.BytesIO() + pickler = pickle.Pickler(data) + pickler.dispatch_table = self.dispatch_table.copy() + pickler.dump(obj) + + def _test_pickle(self, x, indent=0): + + if indent > 50: + print() + print('*' * 80) + print("Indent level too high:", indent) + print('*' * 80) + print() + raise Exception("Indent level too high:", indent) + + def prefix_print(*messages): + print(". " * indent + f"({type(x).__name__}):", *messages) + + if id(x) in self.visited: + prefix_print(".. skipping already visited") + return + + self.visited.add(id(x)) + + import bdb + try: + self._pickle(x) + except bdb.BdbQuit: + raise + except Exception as e: + prefix_print(f'({x}): does not pickle, recursing.', str(e), repr(e), ':.:') + else: + prefix_print(f'({x}): does pickle, skipping.') + return + + if type(x) in self.dispatch_table: + unreducer, args, state = self.dispatch_table[type(x)](x) + prefix_print(f'({type(x)}): is in the dispatch_table, pickling reduced state.') + self._test_pickle(state, indent + 1) + return + + if type(x) == type(_C().f): + prefix_print(f'method {x.__func__.__name__}') + try: + if x.__self__ is None: + value = x.__class__ + else: + value = x.__self__ + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling item method {x.__func__.__name__}: '{x}'.") + raise + + if type(x).__name__ in ['method', 'instancemethod']: + prefix_print(".. skipping method") + return + + if type(x).__name__ in ['list', 'tuple', 'set']: + prefix_print('... len', len(x)) + for key, value in enumerate(x): + prefix_print(f'[{key}]') + try: + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling item {key}: {type(x).__name__}: '{x}'.") + raise + return + + # if type(x).__name__ == 'function': + # prefix_print("function?") + # raise Exception() + + # if type(x).__name__ == 'module': + # prefix_print(".. module") + # pickle_func(x) + # return + + # TODO: make these work properly. + # if type(x).__name__ in ['SourceFactory', 'ElementFactory', 'Environment']: + # prefix_print(".. skipping") + # return + if type(x).__name__ in ['_UnixSelectorEventLoop', 'AuthenticationString', 'SyncManager']: + prefix_print(".. skipping") + return + + if type(x).__name__ == 'dict': + prefix_print("...", x.keys()) + for key, value in x.items(): + prefix_print(f'[{key}]') + try: + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling ['{key}'].") + raise + return + + # TODO: we need to make the generators work too, or ideally replace them. + # if type(x).__name__ == 'generator': + # prefix_print(".. skipping generator") + # return + + # TODO: we need to make the weakrefs work properly. + if type(x).__name__ == 'weakref': + prefix_print(".. dereferencing weakref") + try: + self._test_pickle(x(), indent) + except: + prefix_print(f"while pickling weakref {x}.") + raise + return + + try: + value = x.__getstate__() + except AttributeError: + pass + else: + prefix_print("... __getstate__") + try: + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling a __getstate__.") + raise + return + + try: + x.__dict__ + except AttributeError: + pass + else: + prefix_print("...", x.__dict__.keys()) + for key, value in x.__dict__.items(): + prefix_print(f'__dict__["{key}"]') + try: + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling member ['{key}'].") + raise + return + + try: + x.__slots__ + except AttributeError: + pass + else: + prefix_print("...", x.__slots__) + for key in x.__slots__: + value = getattr(x, key) + prefix_print(f'__slots__["{key}"]') + try: + self._test_pickle(value, indent + 1) + except: + prefix_print(f"while pickling member '{key}'.") + raise + return + + print() + print('*' * 80) + print("!!! Unhandled pickle case:", x, type(x)) + print('*' * 80) + print() + # raise Exception("Unhandled pickle case") + # prefix_print(x) + # self._pickle(x) + + +def test_pickle_direct(x): + import io + import pickle + import multiprocessing.reduction + + # Note that we should expect to see this complaint if we are not in a + # multiprocessing spawning_popen context, this will be fine when we're + # actually spawning: + # + # Pickling an AuthenticationString object is disallowed for + # security reasons. + # + # https://github.com/python/cpython/blob/master/Lib/multiprocessing/process.py#L335 + # + + # Suppress the complaint by pretending we're in a spawning context. + # https://github.com/python/cpython/blob/a8474d025cab794257d2fd0bea67840779b9351f/Lib/multiprocessing/popen_spawn_win32.py#L91 + import multiprocessing.context + multiprocessing.context.set_spawning_popen("PPPPPopen") + + data = io.BytesIO() + + # Try to simulate what multiprocessing will do. + # https://github.com/python/cpython/blob/master/Lib/multiprocessing/reduction.py + try: + multiprocessing.reduction.dump(x, data) + except: + # breakpoint() + raise + finally: + multiprocessing.context.set_spawning_popen(None) + + return data
