At http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
------------------------------------------------------------ revno: 4063 revision-id: [email protected] parent: [email protected] committer: Robert Collins <[email protected]> branch nick: branch.roundtrips timestamp: Sat 2009-02-28 00:05:36 +1100 message: Implement the separate source component for fetch - repository.StreamSource. === modified file 'bzrlib/fetch.py' --- a/bzrlib/fetch.py 2009-02-27 01:02:40 +0000 +++ b/bzrlib/fetch.py 2009-02-27 13:05:36 +0000 @@ -67,7 +67,6 @@ if set, try to limit to the data this revision references. after running: - count_copied -- number of revisions copied This should not be used directly, it's essential a object to encapsulate the logic in InterRepository.fetch(). @@ -83,8 +82,8 @@ like to remove this parameter. """ # result variables. + self.count_copied = 0 self.failed_revisions = [] - self.count_copied = 0 if to_repository.has_same_location(from_repository): # repository.fetch should be taking care of this case. raise errors.BzrError('RepoFetcher run ' @@ -146,14 +145,21 @@ # item_keys_introduced_by should have a richer API than it does at the # moment, so that it can feed the progress information back to this # function? + if (self.from_repository._format.rich_root_data and + not self.to_repository._format.rich_root_data): + raise errors.IncompatibleRepositories( + self.from_repository, self.to_repository, + "different rich-root support") self.pb = bzrlib.ui.ui_factory.nested_progress_bar() try: + source = self.from_repository._get_source( + self.to_repository._format) + stream = source.get_stream(search) from_format = self.from_repository._format - stream = self.get_stream(search, pp) resume_tokens, missing_keys = self.sink.insert_stream( stream, from_format, []) if missing_keys: - stream = self.get_stream_for_missing_keys(missing_keys) + stream = source.get_stream_for_missing_keys(missing_keys) resume_tokens, missing_keys = self.sink.insert_stream( stream, from_format, resume_tokens) if missing_keys: @@ -165,92 +171,11 @@ "second push failed to commit the fetch %r." % ( resume_tokens,)) self.sink.finished() + self.count_copied = source.count_copied finally: if self.pb is not None: self.pb.finished() - def get_stream(self, search, pp): - phase = 'file' - revs = search.get_keys() - graph = self.from_repository.get_graph() - revs = list(graph.iter_topo_order(revs)) - data_to_fetch = self.from_repository.item_keys_introduced_by( - revs, self.pb) - text_keys = [] - for knit_kind, file_id, revisions in data_to_fetch: - if knit_kind != phase: - phase = knit_kind - # Make a new progress bar for this phase - self.pb.finished() - pp.next_phase() - self.pb = bzrlib.ui.ui_factory.nested_progress_bar() - if knit_kind == "file": - # Accumulate file texts - text_keys.extend([(file_id, revision) for revision in - revisions]) - elif knit_kind == "inventory": - # Now copy the file texts. - from_texts = self.from_repository.texts - yield ('texts', from_texts.get_record_stream( - text_keys, self.to_repository._format._fetch_order, - not self.to_repository._format._fetch_uses_deltas)) - # Cause an error if a text occurs after we have done the - # copy. - text_keys = None - # Before we process the inventory we generate the root - # texts (if necessary) so that the inventories references - # will be valid. - for _ in self._generate_root_texts(revs): - yield _ - # NB: This currently reopens the inventory weave in source; - # using a single stream interface instead would avoid this. - self.pb.update("fetch inventory", 0, 1) - from_weave = self.from_repository.inventories - # we fetch only the referenced inventories because we do not - # know for unselected inventories whether all their required - # texts are present in the other repository - it could be - # corrupt. - yield ('inventories', from_weave.get_record_stream( - [(rev_id,) for rev_id in revs], - self.inventory_fetch_order(), - not self.delta_on_metadata())) - elif knit_kind == "signatures": - # Nothing to do here; this will be taken care of when - # _fetch_revision_texts happens. - pass - elif knit_kind == "revisions": - for _ in self._fetch_revision_texts(revs, self.pb): - yield _ - else: - raise AssertionError("Unknown knit kind %r" % knit_kind) - self.count_copied += len(revs) - - def get_stream_for_missing_keys(self, missing_keys): - # missing keys can only occur when we are byte copying and not - # translating (because translation means we don't send - # unreconstructable deltas ever). - keys = {} - keys['texts'] = set() - keys['revisions'] = set() - keys['inventories'] = set() - keys['signatures'] = set() - for key in missing_keys: - keys[key[0]].add(key[1:]) - if len(keys['revisions']): - # If we allowed copying revisions at this point, we could end up - # copying a revision without copying its required texts: a - # violation of the requirements for repository integrity. - raise AssertionError( - 'cannot copy revisions to fill in missing deltas %s' % ( - keys['revisions'],)) - for substream_kind, keys in keys.iteritems(): - vf = getattr(self.from_repository, substream_kind) - # Ask for full texts always so that we don't need more round trips - # after this stream. - stream = vf.get_record_stream(keys, - self.to_repository._format._fetch_order, True) - yield substream_kind, stream - def _revids_to_fetch(self): """Determines the exact revisions needed from self.from_repository to install self._last_revision in self.to_repository. @@ -271,44 +196,6 @@ except errors.NoSuchRevision, e: raise InstallFailed([self._last_revision]) - def _fetch_revision_texts(self, revs, pb): - # fetch signatures first and then the revision texts - # may need to be a InterRevisionStore call here. - from_sf = self.from_repository.signatures - # A missing signature is just skipped. - keys = [(rev_id,) for rev_id in revs] - signatures = filter_absent(from_sf.get_record_stream( - keys, - self.to_repository._format._fetch_order, - not self.to_repository._format._fetch_uses_deltas)) - # If a revision has a delta, this is actually expanded inside the - # insert_record_stream code now, which is an alternate fix for - # bug #261339 - from_rf = self.from_repository.revisions - revisions = from_rf.get_record_stream( - keys, - self.to_repository._format._fetch_order, - not self.delta_on_metadata()) - return [('signatures', signatures), ('revisions', revisions)] - - def _generate_root_texts(self, revs): - """This will be called by __fetch between fetching weave texts and - fetching the inventory weave. - - Subclasses should override this if they need to generate root texts - after fetching weave texts. - """ - return [] - - def inventory_fetch_order(self): - return self.to_repository._format._fetch_order - - def delta_on_metadata(self): - src_serializer = self.from_repository._format._serializer - target_serializer = self.to_repository._format._serializer - return (self.to_repository._format._fetch_uses_deltas and - src_serializer == target_serializer) - class Inter1and2Helper(object): """Helper for operations that convert data from model 1 and 2 @@ -397,21 +284,3 @@ rev_id_to_root_id.get(parent, root_id) == root_id) yield FulltextContentFactory(key, parent_keys, None, '') return [('texts', yield_roots())] - - -class Model1toKnit2Fetcher(RepoFetcher): - """Fetch from a Model1 repository into a Knit2 repository - """ - def __init__(self, to_repository, from_repository, last_revision=None, - pb=None, find_ghosts=True): - self.helper = Inter1and2Helper(from_repository) - RepoFetcher.__init__(self, to_repository, from_repository, - last_revision, pb, find_ghosts) - - def _generate_root_texts(self, revs): - return self.helper.generate_root_texts(revs) - - def inventory_fetch_order(self): - return 'topological' - -Knit1to2Fetcher = Model1toKnit2Fetcher === modified file 'bzrlib/remote.py' --- a/bzrlib/remote.py 2009-02-27 01:21:59 +0000 +++ b/bzrlib/remote.py 2009-02-27 13:05:36 +0000 @@ -624,6 +624,10 @@ """See Repository._get_sink().""" return RemoteStreamSink(self) + def _get_source(self, to_format): + """Return a source for streaming from this repository.""" + return RemoteStreamSource(self, to_format) + def has_revision(self, revision_id): """See Repository.has_revision().""" if revision_id == NULL_REVISION: @@ -1405,9 +1409,6 @@ class RemoteStreamSink(repository.StreamSink): - def __init__(self, target_repo): - repository.StreamSink.__init__(self, target_repo) - def _insert_real(self, stream, src_format, resume_tokens): self.target_repo._ensure_real() sink = self.target_repo._real_repository._get_sink() @@ -1491,6 +1492,10 @@ yield b +class RemoteStreamSource(repository.StreamSource): + """Stream data from a remote server.""" + + class RemoteBranchLockableFiles(LockableFiles): """A 'LockableFiles' implementation that talks to a smart server. === modified file 'bzrlib/repository.py' --- a/bzrlib/repository.py 2009-02-27 03:54:39 +0000 +++ b/bzrlib/repository.py 2009-02-27 13:05:36 +0000 @@ -1226,6 +1226,10 @@ """Return a sink for streaming into this repository.""" return StreamSink(self) + def _get_source(self, to_format): + """Return a source for streaming from this repository.""" + return StreamSource(self, to_format) + @needs_read_lock def has_revision(self, revision_id): """True if this repository has a copy of the revision.""" @@ -2559,8 +2563,21 @@ self.target_get_graph = self.target.get_graph self.target_get_parent_map = self.target.get_parent_map + @needs_write_lock def copy_content(self, revision_id=None): - raise NotImplementedError(self.copy_content) + """Make a complete copy of the content in self into destination. + + This is a destructive operation! Do not use it on existing + repositories. + + :param revision_id: Only copy the content needed to construct + revision_id and its parents. + """ + try: + self.target.set_make_working_trees(self.source.make_working_trees()) + except NotImplementedError: + pass + self.target.fetch(self.source, revision_id=revision_id) def fetch(self, revision_id=None, pb=None, find_ghosts=False): """Fetch the content required to construct revision_id. @@ -2574,14 +2591,15 @@ :returns: (copied_revision_count, failures). """ - # Normally we should find a specific InterRepository subclass to do - # the fetch; if nothing else then at least InterSameDataRepository. - # If none of them is suitable it looks like fetching is not possible; - # we try to give a good message why. _assert_same_model will probably - # give a helpful message; otherwise a generic one. - self._assert_same_model(self.source, self.target) - raise errors.IncompatibleRepositories(self.source, self.target, - "no suitableInterRepository found") + from bzrlib.fetch import RepoFetcher + mutter("Using fetch logic to copy between %s(%s) and %s(%s)", + self.source, self.source._format, self.target, + self.target._format) + f = RepoFetcher(to_repository=self.target, + from_repository=self.source, + last_revision=revision_id, + pb=pb, find_ghosts=find_ghosts) + return f.count_copied, f.failed_revisions def _walk_to_common_revisions(self, revision_ids): """Walk out from revision_ids in source to revisions target has. @@ -2727,42 +2745,6 @@ def is_compatible(source, target): return InterRepository._same_model(source, target) - @needs_write_lock - def copy_content(self, revision_id=None): - """Make a complete copy of the content in self into destination. - - This copies both the repository's revision data, and configuration information - such as the make_working_trees setting. - - This is a destructive operation! Do not use it on existing - repositories. - - :param revision_id: Only copy the content needed to construct - revision_id and its parents. - """ - try: - self.target.set_make_working_trees(self.source.make_working_trees()) - except NotImplementedError: - pass - # but don't bother fetching if we have the needed data now. - if (revision_id not in (None, _mod_revision.NULL_REVISION) and - self.target.has_revision(revision_id)): - return - self.target.fetch(self.source, revision_id=revision_id) - - @needs_write_lock - def fetch(self, revision_id=None, pb=None, find_ghosts=False): - """See InterRepository.fetch().""" - from bzrlib.fetch import RepoFetcher - mutter("Using fetch logic to copy between %s(%s) and %s(%s)", - self.source, self.source._format, self.target, - self.target._format) - f = RepoFetcher(to_repository=self.target, - from_repository=self.source, - last_revision=revision_id, - pb=pb, find_ghosts=find_ghosts) - return f.count_copied, f.failed_revisions - class InterWeaveRepo(InterSameDataRepository): """Optimised code paths between Weave based repositories. @@ -2997,7 +2979,6 @@ return fetcher.count_copied, fetcher.failed_revisions mutter("Using fetch logic to copy between %s(%s) and %s(%s)", self.source, self.source._format, self.target, self.target._format) - self.count_copied = 0 if revision_id is None: # TODO: # everything to do - use pack logic @@ -3096,123 +3077,6 @@ return self.source.revision_ids_to_search_result(result_set) -class InterModel1and2(InterRepository): - - @classmethod - def _get_repo_format_to_test(self): - return None - - @staticmethod - def is_compatible(source, target): - if not source.supports_rich_root() and target.supports_rich_root(): - return True - else: - return False - - @needs_write_lock - def fetch(self, revision_id=None, pb=None, find_ghosts=False): - """See InterRepository.fetch().""" - from bzrlib.fetch import Model1toKnit2Fetcher - f = Model1toKnit2Fetcher(to_repository=self.target, - from_repository=self.source, - last_revision=revision_id, - pb=pb, find_ghosts=find_ghosts) - return f.count_copied, f.failed_revisions - - @needs_write_lock - def copy_content(self, revision_id=None): - """Make a complete copy of the content in self into destination. - - This is a destructive operation! Do not use it on existing - repositories. - - :param revision_id: Only copy the content needed to construct - revision_id and its parents. - """ - try: - self.target.set_make_working_trees(self.source.make_working_trees()) - except NotImplementedError: - pass - # but don't bother fetching if we have the needed data now. - if (revision_id not in (None, _mod_revision.NULL_REVISION) and - self.target.has_revision(revision_id)): - return - self.target.fetch(self.source, revision_id=revision_id) - - -class InterKnit1and2(InterKnitRepo): - - @classmethod - def _get_repo_format_to_test(self): - return None - - @staticmethod - def is_compatible(source, target): - """Be compatible with Knit1 source and Knit3 target""" - try: - from bzrlib.repofmt.knitrepo import ( - RepositoryFormatKnit1, - RepositoryFormatKnit3, - ) - from bzrlib.repofmt.pack_repo import ( - RepositoryFormatKnitPack1, - RepositoryFormatKnitPack3, - RepositoryFormatKnitPack4, - RepositoryFormatKnitPack5, - RepositoryFormatKnitPack5RichRoot, - RepositoryFormatKnitPack6, - RepositoryFormatKnitPack6RichRoot, - RepositoryFormatPackDevelopment2, - RepositoryFormatPackDevelopment2Subtree, - ) - norichroot = ( - RepositoryFormatKnit1, # no rr, no subtree - RepositoryFormatKnitPack1, # no rr, no subtree - RepositoryFormatPackDevelopment2, # no rr, no subtree - RepositoryFormatKnitPack5, # no rr, no subtree - RepositoryFormatKnitPack6, # no rr, no subtree - ) - richroot = ( - RepositoryFormatKnit3, # rr, subtree - RepositoryFormatKnitPack3, # rr, subtree - RepositoryFormatKnitPack4, # rr, no subtree - RepositoryFormatKnitPack5RichRoot,# rr, no subtree - RepositoryFormatKnitPack6RichRoot,# rr, no subtree - RepositoryFormatPackDevelopment2Subtree, # rr, subtree - ) - for format in norichroot: - if format.rich_root_data: - raise AssertionError('Format %s is a rich-root format' - ' but is included in the non-rich-root list' - % (format,)) - for format in richroot: - if not format.rich_root_data: - raise AssertionError('Format %s is not a rich-root format' - ' but is included in the rich-root list' - % (format,)) - # TODO: One alternative is to just check format.rich_root_data, - # instead of keeping membership lists. However, the formats - # *also* have to use the same 'Knit' style of storage - # (line-deltas, fulltexts, etc.) - return (isinstance(source._format, norichroot) and - isinstance(target._format, richroot)) - except AttributeError: - return False - - @needs_write_lock - def fetch(self, revision_id=None, pb=None, find_ghosts=False): - """See InterRepository.fetch().""" - from bzrlib.fetch import Knit1to2Fetcher - mutter("Using fetch logic to copy between %s(%s) and %s(%s)", - self.source, self.source._format, self.target, - self.target._format) - f = Knit1to2Fetcher(to_repository=self.target, - from_repository=self.source, - last_revision=revision_id, - pb=pb, find_ghosts=find_ghosts) - return f.count_copied, f.failed_revisions - - class InterDifferingSerializer(InterKnitRepo): @classmethod @@ -3507,8 +3371,6 @@ InterRepository.register_optimiser(InterSameDataRepository) InterRepository.register_optimiser(InterWeaveRepo) InterRepository.register_optimiser(InterKnitRepo) -InterRepository.register_optimiser(InterModel1and2) -InterRepository.register_optimiser(InterKnit1and2) InterRepository.register_optimiser(InterPackRepo) InterRepository.register_optimiser(InterOtherToRemote) InterRepository.register_optimiser(InterRemoteToOther) @@ -3784,3 +3646,147 @@ if self.target_repo._format._fetch_reconcile: self.target_repo.reconcile() + +class StreamSource(object): + """A source of a stream for fetching between repositories. + + :ivar count_copied: number of revisions streamed. + """ + + def __init__(self, from_repository, to_format): + """Create a StreamSource streaming from from_repository.""" + self.from_repository = from_repository + self.to_format = to_format + self.count_copied = 0 + + def delta_on_metadata(self): + """Return True if delta's are permitted on metadata streams. + + That is on revisions and signatures. + """ + src_serializer = self.from_repository._format._serializer + target_serializer = self.to_format._serializer + return (self.to_format._fetch_uses_deltas and + src_serializer == target_serializer) + + def _fetch_revision_texts(self, revs): + # fetch signatures first and then the revision texts + # may need to be a InterRevisionStore call here. + from_sf = self.from_repository.signatures + # A missing signature is just skipped. + keys = [(rev_id,) for rev_id in revs] + signatures = filter_absent(from_sf.get_record_stream( + keys, + self.to_format._fetch_order, + not self.to_format._fetch_uses_deltas)) + # If a revision has a delta, this is actually expanded inside the + # insert_record_stream code now, which is an alternate fix for + # bug #261339 + from_rf = self.from_repository.revisions + revisions = from_rf.get_record_stream( + keys, + self.to_format._fetch_order, + not self.delta_on_metadata()) + return [('signatures', signatures), ('revisions', revisions)] + + def _generate_root_texts(self, revs): + """This will be called by __fetch between fetching weave texts and + fetching the inventory weave. + + Subclasses should override this if they need to generate root texts + after fetching weave texts. + """ + if self._rich_root_upgrade(): + import bzrlib.fetch + return bzrlib.fetch.Inter1and2Helper( + self.from_repository).generate_root_texts(revs) + else: + return [] + + def get_stream(self, search): + phase = 'file' + revs = search.get_keys() + graph = self.from_repository.get_graph() + revs = list(graph.iter_topo_order(revs)) + data_to_fetch = self.from_repository.item_keys_introduced_by(revs) + text_keys = [] + for knit_kind, file_id, revisions in data_to_fetch: + if knit_kind != phase: + phase = knit_kind + # Make a new progress bar for this phase + if knit_kind == "file": + # Accumulate file texts + text_keys.extend([(file_id, revision) for revision in + revisions]) + elif knit_kind == "inventory": + # Now copy the file texts. + from_texts = self.from_repository.texts + yield ('texts', from_texts.get_record_stream( + text_keys, self.to_format._fetch_order, + not self.to_format._fetch_uses_deltas)) + # Cause an error if a text occurs after we have done the + # copy. + text_keys = None + # Before we process the inventory we generate the root + # texts (if necessary) so that the inventories references + # will be valid. + for _ in self._generate_root_texts(revs): + yield _ + # NB: This currently reopens the inventory weave in source; + # using a single stream interface instead would avoid this. + from_weave = self.from_repository.inventories + # we fetch only the referenced inventories because we do not + # know for unselected inventories whether all their required + # texts are present in the other repository - it could be + # corrupt. + yield ('inventories', from_weave.get_record_stream( + [(rev_id,) for rev_id in revs], + self.inventory_fetch_order(), + not self.delta_on_metadata())) + elif knit_kind == "signatures": + # Nothing to do here; this will be taken care of when + # _fetch_revision_texts happens. + pass + elif knit_kind == "revisions": + for record in self._fetch_revision_texts(revs): + yield record + else: + raise AssertionError("Unknown knit kind %r" % knit_kind) + self.count_copied += len(revs) + + def get_stream_for_missing_keys(self, missing_keys): + # missing keys can only occur when we are byte copying and not + # translating (because translation means we don't send + # unreconstructable deltas ever). + keys = {} + keys['texts'] = set() + keys['revisions'] = set() + keys['inventories'] = set() + keys['signatures'] = set() + for key in missing_keys: + keys[key[0]].add(key[1:]) + if len(keys['revisions']): + # If we allowed copying revisions at this point, we could end up + # copying a revision without copying its required texts: a + # violation of the requirements for repository integrity. + raise AssertionError( + 'cannot copy revisions to fill in missing deltas %s' % ( + keys['revisions'],)) + for substream_kind, keys in keys.iteritems(): + vf = getattr(self.from_repository, substream_kind) + # Ask for full texts always so that we don't need more round trips + # after this stream. + stream = vf.get_record_stream(keys, + self.to_format._fetch_order, True) + yield substream_kind, stream + + def inventory_fetch_order(self): + if self._rich_root_upgrade(): + return 'topological' + else: + return self.to_format._fetch_order + + def _rich_root_upgrade(self): + return (not self.from_repository._format.rich_root_data and + self.to_format.rich_root_data) + === modified file 'bzrlib/tests/interrepository_implementations/__init__.py' --- a/bzrlib/tests/interrepository_implementations/__init__.py 2009-01-17 01:30:58 +0000 +++ b/bzrlib/tests/interrepository_implementations/__init__.py 2009-02-27 13:05:36 +0000 @@ -33,8 +33,6 @@ from bzrlib.repository import ( InterKnitRepo, - InterKnit1and2, - InterModel1and2, InterRepository, ) from bzrlib.tests import ( @@ -100,14 +98,14 @@ result.append((optimiser_class, format_to_test, format_to_test)) # if there are specific combinations we want to use, we can add them - # here. - result.append((InterModel1and2, + # here. We want to test rich root upgrading. + result.append((InterRepository, weaverepo.RepositoryFormat5(), knitrepo.RepositoryFormatKnit3())) - result.append((InterModel1and2, + result.append((InterRepository, knitrepo.RepositoryFormatKnit1(), knitrepo.RepositoryFormatKnit3())) - result.append((InterKnit1and2, + result.append((InterRepository, knitrepo.RepositoryFormatKnit1(), knitrepo.RepositoryFormatKnit3())) result.append((InterKnitRepo, -- bazaar-commits mailing list [email protected] https://lists.ubuntu.com/mailman/listinfo/bazaar-commits
