This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch aevri/win32_minimal_seemstowork_20190829 in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 43021072636526991f89edf336e21fd658c47452 Author: Angelos Evripiotis <[email protected]> AuthorDate: Wed Jul 10 09:24:30 2019 +0100 WIP: pickleable jobs WIP: make some things pickleable WIP: Make ElementFactory and SourceFactory picklable WIP: job: pickling machinery for jobs WIP: loader: don't pickle _fetch_subprojects Note that a new circular dependency was introduced in: 737aea18 Handle subproject fetching in the Stream class We can avoid the complications by not pickling it out to the child job. WIP: move reducers to jobpickler WIP: jobpickler: use proto serialization instead of pickle WIP: jobpickler: pickle less WIP: job: pickle apologies --- src/buildstream/_elementfactory.py | 7 +- src/buildstream/_plugincontext.py | 26 ++++- src/buildstream/_project.py | 12 +- src/buildstream/_scheduler/jobs/jobpickler.py | 151 ++++++++++++++++++++++++++ src/buildstream/_sourcefactory.py | 5 +- src/buildstream/element.py | 1 + src/buildstream/source.py | 2 + 7 files changed, 194 insertions(+), 10 deletions(-) diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py index d6591bf..b2a7f73 100644 --- a/src/buildstream/_elementfactory.py +++ b/src/buildstream/_elementfactory.py @@ -33,9 +33,12 @@ class ElementFactory(PluginContext): def __init__(self, plugin_base, *, format_versions={}, - plugin_origins=None): + plugin_origins=None, + pass_=None): - super().__init__(plugin_base, Element, [_site.element_plugins], + assert pass_ is not None + + super().__init__(plugin_base, Element, [_site.element_plugins], 'element' + str(pass_), plugin_origins=plugin_origins, format_versions=format_versions) diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py index 7fef9b9..fa07e7b 100644 --- a/src/buildstream/_plugincontext.py +++ b/src/buildstream/_plugincontext.py @@ -44,9 +44,11 @@ from . import _yaml # class PluginContext(): - def __init__(self, plugin_base, base_type, site_plugin_path, *, + def __init__(self, plugin_base, base_type, site_plugin_path, identifier, *, plugin_origins=None, format_versions={}): + self._identifier = identifier + # The plugin kinds which were loaded self.loaded_dependencies = [] @@ -59,10 +61,26 @@ class PluginContext(): # The PluginSource object self._plugin_base = plugin_base - self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path) + self._site_plugin_path = site_plugin_path + self._site_source = plugin_base.make_plugin_source( + searchpath=self._site_plugin_path, + identifier='site_plugin-' + self._identifier) self._alternate_sources = {} self._format_versions = format_versions + def __getstate__(self): + import copy + state = copy.copy(self.__dict__) + del state['_site_source'] + state['_types'] = {} + return state + + def __setstate__(self, state): + self.__dict__ = state + self._site_source = self._plugin_base.make_plugin_source( + searchpath=self._site_plugin_path, + identifier='site_plugin-' + self._identifier) + # lookup(): # # Fetches a type loaded from a plugin in this plugin context @@ -80,7 +98,7 @@ class PluginContext(): def _get_local_plugin_source(self, path): if ('local', path) not in self._alternate_sources: # key by a tuple to avoid collision - source = self._plugin_base.make_plugin_source(searchpath=[path]) + source = self._plugin_base.make_plugin_source(searchpath=[path], identifier='local_plugin-' + path + '-' + self._identifier) # Ensure that sources never get garbage collected, # as they'll take the plugins with them. self._alternate_sources[('local', path)] = source @@ -121,7 +139,7 @@ class PluginContext(): # The plugin didn't have an accompanying YAML file defaults = None - source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)]) + source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)], identifier='pip_plugin-' + self._identifier) self._alternate_sources[('pip', package_name)] = source else: diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py index 5f433c0..74fca7f 100644 --- a/src/buildstream/_project.py +++ b/src/buildstream/_project.py @@ -97,6 +97,8 @@ class Project(): default_mirror=None, parent_loader=None, search_for_project=True, fetch_subprojects=None): + self._pass = None + # The project name self.name = None @@ -617,6 +619,8 @@ class Project(): config_no_include = _yaml.node_copy(self._default_config_node) _yaml.composite(config_no_include, project_conf_first_pass) + assert self._pass is None + self._pass = 1 self._load_pass(config_no_include, self.first_pass_config, ignore_unknown=True) @@ -641,6 +645,8 @@ class Project(): config = _yaml.node_copy(self._default_config_node) _yaml.composite(config, project_conf_second_pass) + assert self._pass == 1 + self._pass = 2 self._load_pass(config, self.config) self._validate_node(config) @@ -914,10 +920,12 @@ class Project(): pluginbase = PluginBase(package='buildstream.plugins') output.element_factory = ElementFactory(pluginbase, plugin_origins=plugin_element_origins, - format_versions=element_format_versions) + format_versions=element_format_versions, + pass_=self._pass) output.source_factory = SourceFactory(pluginbase, plugin_origins=plugin_source_origins, - format_versions=source_format_versions) + format_versions=source_format_versions, + pass_=self._pass) # _store_origin() # diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py new file mode 100644 index 0000000..5c1742f --- /dev/null +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -0,0 +1,151 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Angelos Evripiotis <[email protected]> + + +import copyreg +import io +import pickle + +from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto + +# BuildStream toplevel imports +from ... import Element, Source +from ..._loader import Loader +from ..._messenger import Messenger + + +def _reduce_artifact_proto(instance): + assert isinstance(instance, ArtifactProto) + data = instance.SerializeToString() + return (_unreduce_artifact_proto, (data,)) + + +def _unreduce_artifact_proto(data): + instance = ArtifactProto() + instance.ParseFromString(data) + return instance + + +def _reduce_loader(instance): + assert isinstance(instance, Loader) + state = instance.__dict__.copy() + + # When pickling a Loader over to the ChildJob, we don't want to bring + # the whole Stream over with it. The _fetch_subprojects member is a method + # of the Stream. We also don't want to remove it in the main process. If we + # remove it in the child process then we will already be too late. The only + # time that seems just right is here, when preparing the child process' + # copy of the Loader. + # + del state['_fetch_subprojects'] + + return (Loader.__new__, (Loader,), state) + + +def _reduce_messenger(instance): + assert isinstance(instance, Messenger) + state = instance.__dict__.copy() + + # When pickling a Messenger over to the ChildJob, we don't want to bring + # the whole _message_handler over with it. We also don't want to remove it + # in the main process. If we remove it in the child process then we will + # already be too late. The only time that seems just right is here, when + # preparing the child process' copy of the Messenger. + # + # Another approach might be to use a context manager on the Messenger, + # which removes and restores the _message_handler. This wouldn't require + # access to private details of Messenger. + # + del state['_message_handler'] + + return (Messenger.__new__, (Messenger,), state) + + +def _reduce_element(element): + assert isinstance(element, Element) + meta_kind = element._meta_kind + project = element._get_project() + factory = project.config.element_factory + args = (factory, meta_kind) + state = element.__dict__.copy() + state["_Element__reverse_dependencies"] = None + state["_Element__buildable_callback"] = None + return (_unreduce_plugin, args, state) + + +def _reduce_source(source): + assert isinstance(source, Source) + meta_kind = source._meta_kind + project = source._get_project() + factory = project.config.source_factory + args = (factory, meta_kind) + return (_unreduce_plugin, args, source.__dict__.copy()) + + +def _unreduce_plugin(factory, meta_kind): + cls, _ = factory.lookup(meta_kind) + plugin = cls.__new__(cls) + + # TODO: find a better way of persisting this factory, otherwise the plugin + # will become invalid. + plugin.factory = factory + + return plugin + + +def pickle_child_job(child_job, context): + + # Note: Another way of doing this would be to let PluginBase do it's + # import-magic. We would achieve this by first pickling the factories, and + # the string names of their plugins. Unpickling the plugins in the child + # process would then "just work". There would be an additional cost of + # having to load every plugin kind, regardless of which ones are used. + + projects = context.get_projects() + element_classes = [ + cls + for p in projects + for cls, _ in p.config.element_factory._types.values() + ] + source_classes = [ + cls + for p in projects + for cls, _ in p.config.source_factory._types.values() + ] + + data = io.BytesIO() + pickler = pickle.Pickler(data) + pickler.dispatch_table = copyreg.dispatch_table.copy() + for cls in element_classes: + pickler.dispatch_table[cls] = _reduce_element + for cls in source_classes: + pickler.dispatch_table[cls] = _reduce_source + pickler.dispatch_table[ArtifactProto] = _reduce_artifact_proto + pickler.dispatch_table[Loader] = _reduce_loader + pickler.dispatch_table[Messenger] = _reduce_messenger + + pickler.dump(child_job) + data.seek(0) + + return data + + +def unpickle_child_job(pickled): + child_job = pickle.load(pickled) + return child_job diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py index 1d959a1..eca4b50 100644 --- a/src/buildstream/_sourcefactory.py +++ b/src/buildstream/_sourcefactory.py @@ -33,9 +33,10 @@ class SourceFactory(PluginContext): def __init__(self, plugin_base, *, format_versions={}, - plugin_origins=None): + plugin_origins=None, + pass_=None): - super().__init__(plugin_base, Source, [_site.source_plugins], + super().__init__(plugin_base, Source, [_site.source_plugins], 'source' + str(pass_), format_versions=format_versions, plugin_origins=plugin_origins) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 758a0b9..7061a27 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -186,6 +186,7 @@ class Element(Plugin): """ def __init__(self, context, project, meta, plugin_conf): + self._meta_kind = meta.kind self.__cache_key_dict = None # Dict for cache key calculation self.__cache_key = None # Our cached cache key diff --git a/src/buildstream/source.py b/src/buildstream/source.py index f8b5d3f..202af83 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -312,6 +312,8 @@ class Source(Plugin): super().__init__("{}-{}".format(meta.element_name, meta.element_index), context, project, provenance, "source", unique_id=unique_id) + self._meta_kind = meta.kind + self.__element_name = meta.element_name # The name of the element owning this source self.__element_index = meta.element_index # The index of the source in the owning element's source list self.__element_kind = meta.element_kind # The kind of the element owning this source
