Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-1-parser-test-suite 37a346be9 -> 8fbde876f
Improvements to presentation caching Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8fbde876 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8fbde876 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8fbde876 Branch: refs/heads/ARIA-1-parser-test-suite Commit: 8fbde876f7bbd81a1b20ce4e1e5f6bb96e6e0abf Parents: 37a346b Author: Tal Liron <tal.li...@gmail.com> Authored: Mon Sep 25 13:46:52 2017 -0500 Committer: Tal Liron <tal.li...@gmail.com> Committed: Mon Sep 25 13:46:52 2017 -0500 ---------------------------------------------------------------------- aria/parser/consumption/presentation.py | 191 +++++++++++++++------------ aria/parser/presentation/context.py | 10 ++ 2 files changed, 118 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fbde876/aria/parser/consumption/presentation.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/presentation.py b/aria/parser/consumption/presentation.py index 873f528..1ebbc51 100644 --- a/aria/parser/consumption/presentation.py +++ b/aria/parser/consumption/presentation.py @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from threading import Lock - -from ...utils.threading import (BlockingExecutor, FixedThreadPoolExecutor) from ...utils.formatting import (json_dumps, yaml_dumps) from ..loading import UriLocation from ..presentation import PresenterNotFoundError @@ -30,12 +27,12 @@ class Read(Consumer): """ Reads the presentation, handling imports recursively. - It works by consuming a data source via appropriate :class:`~aria.parser.loading.Loader`, + It works by consuming a data source via appropriate :class:`~aria.parser.loading.Loader`Entry, :class:`~aria.parser.reading.Reader`, and :class:`~aria.parser.presentation.Presenter` instances. It supports agnostic raw data composition for presenters that have - ``_get_import_locations`` and ``_merge_import``. + ``_get_import_cache`` and ``_merge_import``. To improve performance, loaders are called asynchronously on separate threads. @@ -45,59 +42,22 @@ class Read(Consumer): def __init__(self, context): super(Read, self).__init__(context) - self._locations = set() # for keeping track of locations already read - self._locations_lock = Lock() + self._cache = {} def consume(self): - location = self.context.presentation.location - - if location is None: - self.context.validation.report('Presentation consumer: missing location') - return - - presenter = None - imported_presentations = [] - - if self.context.presentation.threads == 1: - # BlockingExecutor is much faster for the single-threaded case - executor = BlockingExecutor(print_exceptions=self.context.presentation.print_exceptions) - else: - executor = FixedThreadPoolExecutor(size=self.context.presentation.threads, - timeout=self.context.presentation.timeout, - print_exceptions=self.context.presentation \ - .print_exceptions) - - try: - presenter, canonical_location = self._present(location, None, None, executor) - executor.drain() - - # Handle exceptions - for e in executor.exceptions: - self._handle_exception(e) + main, entries = self._present_all() - imported_presentations = executor.returns - finally: - executor.close() - - # Merge imports - for imported_presentation, _ in imported_presentations: - okay = True - if hasattr(presenter, '_validate_import'): - # _validate_import will report an issue if invalid - okay = presenter._validate_import(self.context, imported_presentation) - if okay and hasattr(presenter, '_merge_import'): - presenter._merge_import(imported_presentation) + # Merge presentations + main.merge(entries, self.context) - # Make sure merged presenter is not in cache - if canonical_location is not None: - try: - del PRESENTATION_CACHE[canonical_location] - except KeyError: - pass + # Cache merged presentations + if self.context.presentation.cache: + for presentation in entries: + presentation.cache() - if canonical_location is not None: - self.context.presentation.location = canonical_location - self.context.presentation.presenter = presenter + self.context.presentation.presenter = main.presentation + if main.canonical_location is not None: + self.context.presentation.location = main.canonical_location def dump(self): if self.context.has_arg_switch('yaml'): @@ -112,58 +72,80 @@ class Read(Consumer): self.context.presentation.presenter._dump(self.context) def _handle_exception(self, e): - if isinstance(e, _AlreadyPresentedException): + if isinstance(e, _Skip): return super(Read, self)._handle_exception(e) - def _present(self, location, origin_location, default_presenter_class, executor): + def _present_all(self): + location = self.context.presentation.location + + if location is None: + self.context.validation.report('Presentation consumer: missing location') + return + + executor = self.context.presentation.create_executor() + try: + main = self._present(location, None, None, executor) + executor.drain() + + # Handle exceptions + for e in executor.exceptions: + self._handle_exception(e) + + entries = executor.returns or [] + finally: + executor.close() + + entries.insert(0, main) + + return main, entries + + def _present(self, location, origin_canonical_location, origin_presenter_class, executor): # Link the context to this thread self.context.set_thread_local() - presentation = None - cache = False - # Canonicalize the location if self.context.reading.reader is None: - loader, canonical_location = self._create_loader(location, origin_location) - if self.context.presentation.cache: - cache = True + loader, canonical_location = self._create_loader(location, origin_canonical_location) else: - # If a reader is specified in the context we skip loading + # If a reader is specified in the context then we skip loading loader = None canonical_location = location - # Make sure we didn't already present this location - self._verify_not_already_presented(canonical_location) + # Skip self imports + if canonical_location == origin_canonical_location: + raise _Skip() - # Is the presentation in the cache? - if cache: + if self.context.presentation.cache: + # Is the presentation in the global cache? try: presentation = PRESENTATION_CACHE[canonical_location] + return _Entry(presentation, canonical_location, origin_canonical_location) except KeyError: pass - if presentation is None: - # Create new presentation - presentation = self._create_presentation(canonical_location, loader, - default_presenter_class) + try: + # Is the presentation in the local cache? + presentation = self._cache[canonical_location] + return _Entry(presentation, canonical_location, origin_canonical_location) + except KeyError: + pass - # Cache - if cache: - PRESENTATION_CACHE[canonical_location] = presentation + # Create and cache new presentation + presentation = self._create_presentation(canonical_location, loader, + origin_presenter_class) + self._cache[canonical_location] = presentation # Submit imports to executor if hasattr(presentation, '_get_import_locations'): import_locations = presentation._get_import_locations(self.context) if import_locations: for import_location in import_locations: - # Our imports will default to using our presenter class and use our canonical - # location as their origin location import_location = UriLocation(import_location) executor.submit(self._present, import_location, canonical_location, presentation.__class__, executor) - return presentation, canonical_location + return _Entry(presentation, canonical_location, origin_canonical_location) def _create_loader(self, location, origin_canonical_location): loader = self.context.loading.loader_source.get_loader(self.context.loading, location, @@ -223,13 +205,56 @@ class Read(Consumer): return presentation - def _verify_not_already_presented(self, canonical_location): - with self._locations_lock: - if canonical_location in self._locations: - raise _AlreadyPresentedException(u'already presented: {0}' - .format(canonical_location)) - self._locations.add(canonical_location) + +class _Entry(object): + def __init__(self, presentation, canonical_location, origin_canonical_location): + self.presentation = presentation + self.canonical_location = canonical_location + self.origin_canonical_location = origin_canonical_location + self.merged = False + + def get_imports(self, entries): + imports = [] + + def has_import(entry): + for i in imports: + if i.canonical_location == entry.canonical_location: + return True + return False + + for entry in entries: + if entry.origin_canonical_location == self.canonical_location: + if not has_import(entry): + imports.append(entry) + return imports + + def merge(self, entries, context): + # Make sure to only merge each presentation once + if self.merged: + return + self.merged = True + for entry in entries: + if entry.presentation == self.presentation: + entry.merged = True + + for entry in self.get_imports(entries): + # Make sure import is merged + entry.merge(entries, context) + + # Validate import + if hasattr(self.presentation, '_validate_import'): + # _validate_import will report an issue if invalid + valid = self.presentation._validate_import(context, entry.presentation) + else: + valid = True + + # Merge import + if valid and hasattr(self.presentation, '_merge_import'): + self.presentation._merge_import(entry.presentation) + + def cache(self): + PRESENTATION_CACHE[self.canonical_location] = self.presentation -class _AlreadyPresentedException(Exception): +class _Skip(Exception): pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fbde876/aria/parser/presentation/context.py ---------------------------------------------------------------------- diff --git a/aria/parser/presentation/context.py b/aria/parser/presentation/context.py index c88b9b0..32e3fd6 100644 --- a/aria/parser/presentation/context.py +++ b/aria/parser/presentation/context.py @@ -15,6 +15,7 @@ from .source import DefaultPresenterSource +from ...utils.threading import (BlockingExecutor, FixedThreadPoolExecutor) class PresentationContext(object): @@ -66,3 +67,12 @@ class PresentationContext(object): """ return self.presenter._get_from_dict(*names) if self.presenter is not None else None + + def create_executor(self): + if self.threads == 1: + # BlockingExecutor is much faster for the single-threaded case + return BlockingExecutor(print_exceptions=self.print_exceptions) + + return FixedThreadPoolExecutor(size=self.threads, + timeout=self.timeout, + print_exceptions=self.print_exceptions)