At http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
------------------------------------------------------------ revno: 4064 revision-id: [email protected] parent: [email protected] committer: Robert Collins <[email protected]> branch nick: branch.roundtrips timestamp: Mon 2009-03-02 14:38:07 +1100 message: Streaming fetch from remote servers. === modified file 'NEWS' --- a/NEWS 2009-02-27 02:44:10 +0000 +++ b/NEWS 2009-03-02 03:38:07 +0000 @@ -117,8 +117,9 @@ rather than the sometimes-absent disk label. (Robert Collins) * ``bzrlib.fetch`` is now composed of a sender and a sink component - allowing for decoupling over a network connection. Fetching into - a RemoteRepository uses this to stream the operation. + allowing for decoupling over a network connection. Fetching from + or into a RemoteRepository with a 1.13 server will use this to + stream the operation. (Andrew Bennetts, Robert Collins) * ``bzrlib.tests.run_suite`` accepts a runner_class parameter === modified file 'bzrlib/fetch.py' --- a/bzrlib/fetch.py 2009-02-27 13:05:36 +0000 +++ b/bzrlib/fetch.py 2009-03-02 03:38:07 +0000 @@ -40,7 +40,7 @@ from bzrlib.tsort import topo_sort from bzrlib.trace import mutter import bzrlib.ui -from bzrlib.versionedfile import filter_absent, FulltextContentFactory +from bzrlib.versionedfile import FulltextContentFactory # TODO: Avoid repeatedly opening weaves so many times. === modified file 'bzrlib/pack.py' --- a/bzrlib/pack.py 2009-01-17 01:30:58 +0000 +++ b/bzrlib/pack.py 2009-03-02 03:38:07 +0000 @@ -411,10 +411,15 @@ self._state_handler() cur_buffer_length = len(self._buffer) - def read_pending_records(self): - records = self._parsed_records - self._parsed_records = [] - return records + def read_pending_records(self, max=None): + if max: + records = self._parsed_records[:max] + del self._parsed_records[:max] + return records + else: + records = self._parsed_records + self._parsed_records = [] + return records def _consume_line(self): """Take a line out of the buffer, and return the line. === modified file 'bzrlib/remote.py' --- a/bzrlib/remote.py 2009-02-27 13:05:36 +0000 +++ b/bzrlib/remote.py 2009-03-02 03:38:07 +0000 @@ -40,11 +40,10 @@ SmartProtocolError, ) from bzrlib.lockable_files import LockableFiles -from bzrlib.smart import client, vfs +from bzrlib.smart import client, vfs, repository as smart_repo from bzrlib.revision import ensure_null, NULL_REVISION from bzrlib.trace import mutter, note, warning from bzrlib.util import bencode -from bzrlib.versionedfile import record_to_fulltext_bytes class _RpcHelper(object): @@ -1434,14 +1433,15 @@ # do not fallback when actually pushing the stream. A cleanup patch # is going to look at rewinding/restarting the stream/partial # buffering etc. - byte_stream = self._stream_to_byte_stream([], src_format) + byte_stream = smart_repo._stream_to_byte_stream([], src_format) try: response = client.call_with_body_stream( ('Repository.insert_stream', path, ''), byte_stream) except errors.UnknownSmartMethod: medium._remember_remote_is_before((1,13)) return self._insert_real(stream, src_format, resume_tokens) - byte_stream = self._stream_to_byte_stream(stream, src_format) + byte_stream = smart_repo._stream_to_byte_stream( + stream, src_format) resume_tokens = ' '.join(resume_tokens) response = client.call_with_body_stream( ('Repository.insert_stream', path, resume_tokens), byte_stream) @@ -1459,42 +1459,45 @@ collection.reload_pack_names() return [], set() - def _stream_to_byte_stream(self, stream, src_format): - bytes = [] - pack_writer = pack.ContainerWriter(bytes.append) - pack_writer.begin() - pack_writer.add_bytes_record(src_format.network_name(), '') - adapters = {} - def get_adapter(adapter_key): - try: - return adapters[adapter_key] - except KeyError: - adapter_factory = adapter_registry.get(adapter_key) - adapter = adapter_factory(self) - adapters[adapter_key] = adapter - return adapter - for substream_type, substream in stream: - for record in substream: - if record.storage_kind in ('chunked', 'fulltext'): - serialised = record_to_fulltext_bytes(record) - else: - serialised = record.get_bytes_as(record.storage_kind) - if serialised: - # Some streams embed the whole stream into the wire - # representation of the first record, which means that - # later records have no wire representation: we skip them. - pack_writer.add_bytes_record(serialised, [(substream_type,)]) - for b in bytes: - yield b - del bytes[:] - pack_writer.end() - for b in bytes: - yield b - class RemoteStreamSource(repository.StreamSource): """Stream data from a remote server.""" + def get_stream(self, search): + # streaming with fallback repositories is not well defined yet: The + # remote repository cannot see the fallback repositories, and thus + # cannot satisfy the entire search in the general case. Likewise the + # fallback repositories cannot reify the search to determine what they + # should send. It likely needs a return value in the stream listing the + # edge of the search to resume from in fallback repositories. + if self.from_repository._fallback_repositories: + return repository.StreamSource.get_stream(self, search) + repo = self.from_repository + client = repo._client + medium = client._medium + if medium._is_remote_before((1, 13)): + # No possible way this can work. + return repository.StreamSource.get_stream(self, search) + path = repo.bzrdir._path_for_remote_call(client) + try: + recipe = repo._serialise_search_recipe(search._recipe) + response = repo._call_with_body_bytes_expecting_body( + 'Repository.StreamSource.get_stream', + (path, self.to_format.network_name()), recipe) + response_tuple, response_handler = response + except errors.UnknownSmartMethod: + medium._remember_remote_is_before((1,13)) + return repository.StreamSource.get_stream(self, search) + if response_tuple[0] != 'ok': + raise errors.UnexpectedSmartServerResponse(response_tuple) + byte_stream = response_handler.read_streamed_body() + src_format, stream = smart_repo._byte_stream_to_stream(byte_stream) + if src_format.network_name() != repo._format.network_name(): + raise AssertionError( + "Mismatched RemoteRepository and stream src %r, %r" % ( + src_format.network_name(), repo._format.network_name())) + return stream + class RemoteBranchLockableFiles(LockableFiles): """A 'LockableFiles' implementation that talks to a smart server. === modified file 'bzrlib/repository.py' --- a/bzrlib/repository.py 2009-02-27 13:05:36 +0000 +++ b/bzrlib/repository.py 2009-03-02 03:38:07 +0000 @@ -3675,7 +3675,7 @@ from_sf = self.from_repository.signatures # A missing signature is just skipped. keys = [(rev_id,) for rev_id in revs] - signatures = filter_absent(from_sf.get_record_stream( + signatures = versionedfile.filter_absent(from_sf.get_record_stream( keys, self.to_format._fetch_order, not self.to_format._fetch_uses_deltas)) === modified file 'bzrlib/smart/repository.py' --- a/bzrlib/smart/repository.py 2009-02-25 00:31:09 +0000 +++ b/bzrlib/smart/repository.py 2009-03-02 03:38:07 +0000 @@ -39,7 +39,7 @@ from bzrlib.repository import _strip_NULL_ghosts, network_format_registry from bzrlib import revision as _mod_revision from bzrlib.util import bencode -from bzrlib.versionedfile import NetworkRecordStream +from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes class SmartServerRepositoryRequest(SmartServerRequest): @@ -333,6 +333,104 @@ return SuccessfulSmartServerResponse(('ok', token)) +class SmartServerRepositoryStreamSourceGetStream(SmartServerRepositoryRequest): + + def do_repository_request(self, repository, to_network_name): + """Get a stream for inserting into a to_format repository. + + :param repository: The repository to stream from. + :param to_network_name: The network name of the format of the target + repository. + """ + self._to_format = network_format_registry.get(to_network_name) + return None # Signal that we want a body. + + def do_body(self, body_bytes): + repository = self._repository + repository.lock_read() + try: + search, error = self.recreate_search(repository, body_bytes) + if error is not None: + repository.unlock() + return error + search = search.get_result() + source = repository._get_source(self._to_format) + stream = source.get_stream(search) + except Exception: + exc_info = sys.exc_info() + try: + # On non-error, unlocking is done by the body stream handler. + repository.unlock() + finally: + raise exc_info[0], exc_info[1], exc_info[2] + return SuccessfulSmartServerResponse(('ok',), + body_stream=self.body_stream(stream, repository)) + + def body_stream(self, stream, repository): + byte_stream = _stream_to_byte_stream(stream, repository._format) + try: + for bytes in byte_stream: + yield bytes + except errors.RevisionNotPresent, e: + # This shouldn't be able to happen, but as we don't buffer + # everything it can in theory happen. + repository.unlock() + yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id)) + else: + repository.unlock() + + +def _stream_to_byte_stream(stream, src_format): + """Convert a record stream to a self delimited byte stream.""" + pack_writer = pack.ContainerSerialiser() + yield pack_writer.begin() + yield pack_writer.bytes_record(src_format.network_name(), '') + for substream_type, substream in stream: + for record in substream: + if record.storage_kind in ('chunked', 'fulltext'): + serialised = record_to_fulltext_bytes(record) + else: + serialised = record.get_bytes_as(record.storage_kind) + if serialised: + # Some streams embed the whole stream into the wire + # representation of the first record, which means that + # later records have no wire representation: we skip them. + yield pack_writer.bytes_record(serialised, [(substream_type,)]) + yield pack_writer.end() + + +def _byte_stream_to_stream(byte_stream): + """Convert a byte stream into a format and a StreamSource stream. + + :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream. + :return: (RepositoryFormat, stream_generator) + """ + stream_decoder = pack.ContainerPushParser() + def record_stream(): + """Closure to return the substreams.""" + # May have fully parsed records already. + for record in stream_decoder.read_pending_records(): + record_names, record_bytes = record + record_name, = record_names + substream_type = record_name[0] + substream = NetworkRecordStream([record_bytes]) + yield substream_type, substream.read() + for bytes in byte_stream: + stream_decoder.accept_bytes(bytes) + for record in stream_decoder.read_pending_records(): + record_names, record_bytes = record + record_name, = record_names + substream_type = record_name[0] + substream = NetworkRecordStream([record_bytes]) + yield substream_type, substream.read() + for bytes in byte_stream: + stream_decoder.accept_bytes(bytes) + for record in stream_decoder.read_pending_records(max=1): + record_names, src_format_name = record + src_format = network_format_registry.get(src_format_name) + return src_format, record_stream() + + class SmartServerRepositoryUnlock(SmartServerRepositoryRequest): def do_repository_request(self, repository, token): @@ -415,6 +513,12 @@ class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest): + """Insert a record stream from a RemoteSink into a repository. + + This gets bytes pushed to it by the network infrastructure and turns that + into a bytes iterator using a thread. That is then processed by + _byte_stream_to_stream. + """ def do_repository_request(self, repository, resume_tokens): """StreamSink.insert_stream for a remote repository.""" @@ -422,44 +526,31 @@ tokens = [token for token in resume_tokens.split(' ') if token] self.tokens = tokens self.repository = repository - self.stream_decoder = pack.ContainerPushParser() - self.src_format = None self.queue = Queue.Queue() - self.insert_thread = None + self.insert_thread = threading.Thread(target=self._inserter_thread) + self.insert_thread.start() def do_chunk(self, body_stream_chunk): - self.stream_decoder.accept_bytes(body_stream_chunk) - for record in self.stream_decoder.read_pending_records(): - record_names, record_bytes = record - if self.src_format is None: - src_format_name = record_bytes - src_format = network_format_registry.get(src_format_name) - self.src_format = src_format - self.insert_thread = threading.Thread(target=self._inserter_thread) - self.insert_thread.start() - else: - record_name, = record_names - substream_type = record_name[0] - stream = NetworkRecordStream([record_bytes]) - for record in stream.read(): - self.queue.put((substream_type, [record])) + self.queue.put(body_stream_chunk) def _inserter_thread(self): try: + src_format, stream = _byte_stream_to_stream( + self.blocking_byte_stream()) self.insert_result = self.repository._get_sink().insert_stream( - self.blocking_read_stream(), self.src_format, self.tokens) + stream, src_format, self.tokens) self.insert_ok = True except: self.insert_exception = sys.exc_info() self.insert_ok = False - def blocking_read_stream(self): + def blocking_byte_stream(self): while True: - item = self.queue.get() - if item is StopIteration: + bytes = self.queue.get() + if bytes is StopIteration: return else: - yield item + yield bytes def do_end(self): self.queue.put(StopIteration) === modified file 'bzrlib/smart/request.py' --- a/bzrlib/smart/request.py 2009-02-26 04:25:00 +0000 +++ b/bzrlib/smart/request.py 2009-03-02 03:38:07 +0000 @@ -465,6 +465,9 @@ request_handlers.register_lazy( 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock') request_handlers.register_lazy( + 'Repository.StreamSource.get_stream', 'bzrlib.smart.repository', + 'SmartServerRepositoryStreamSourceGetStream') +request_handlers.register_lazy( 'Repository.tarball', 'bzrlib.smart.repository', 'SmartServerRepositoryTarball') request_handlers.register_lazy( === modified file 'bzrlib/tests/blackbox/test_branch.py' --- a/bzrlib/tests/blackbox/test_branch.py 2009-02-27 03:54:39 +0000 +++ b/bzrlib/tests/blackbox/test_branch.py 2009-03-02 03:38:07 +0000 @@ -259,6 +259,22 @@ class TestSmartServerBranching(ExternalBase): + def test_branch_from_trivial_branch_to_same_server_branch_acceptance(self): + self.setup_smart_server_with_call_log() + t = self.make_branch_and_tree('from') + for count in range(9): + t.commit(message='commit %d' % count) + self.reset_smart_call_log() + out, err = self.run_bzr(['branch', self.get_url('from'), + self.get_url('target')]) + rpc_count = len(self.hpss_calls) + # This figure represent the amount of work to perform this use case. It + # is entirely ok to reduce this number if a test fails due to rpc_count + # being too low. If rpc_count increases, more network roundtrips have + # become necessary for this use case. Please do not adjust this number + # upwards without agreement from bzr's network support maintainers. + self.assertEqual(99, rpc_count) + def test_branch_from_trivial_branch_streaming_acceptance(self): self.setup_smart_server_with_call_log() t = self.make_branch_and_tree('from') @@ -273,7 +289,7 @@ # being too low. If rpc_count increases, more network roundtrips have # become necessary for this use case. Please do not adjust this number # upwards without agreement from bzr's network support maintainers. - self.assertEqual(78, rpc_count) + self.assertEqual(25, rpc_count) class TestRemoteBranch(TestCaseWithSFTPServer): === modified file 'bzrlib/tests/branch_implementations/test_branch.py' --- a/bzrlib/tests/branch_implementations/test_branch.py 2009-02-24 08:09:17 +0000 +++ b/bzrlib/tests/branch_implementations/test_branch.py 2009-03-02 03:38:07 +0000 @@ -109,7 +109,7 @@ wt.commit('lala!', rev_id='revision-1', allow_pointless=False) b2 = self.make_branch('b2') - self.assertEqual((1, []), b2.fetch(b1)) + b2.fetch(b1) rev = b2.repository.get_revision('revision-1') tree = b2.repository.revision_tree('revision-1') === modified file 'bzrlib/tests/bzrdir_implementations/test_bzrdir.py' --- a/bzrlib/tests/bzrdir_implementations/test_bzrdir.py 2009-02-23 15:29:35 +0000 +++ b/bzrlib/tests/bzrdir_implementations/test_bzrdir.py 2009-03-02 03:38:07 +0000 @@ -553,7 +553,7 @@ # Ensure no format data is cached a_dir = bzrlib.branch.Branch.open_from_transport( self.get_transport('source')).bzrdir - target_transport = a_dir.root_transport.clone('..').clone('target') + target_transport = self.get_transport('target') target_bzrdir = a_dir.clone_on_transport(target_transport) target_repo = target_bzrdir.open_repository() source_branch = bzrlib.branch.Branch.open( @@ -655,7 +655,7 @@ def test_clone_respects_stacked(self): branch = self.make_branch('parent') - child_transport = branch.bzrdir.root_transport.clone('../child') + child_transport = self.get_transport('child') child = branch.bzrdir.clone_on_transport(child_transport, stacked_on=branch.base) self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base) === modified file 'bzrlib/tests/per_repository/test_fetch.py' --- a/bzrlib/tests/per_repository/test_fetch.py 2009-01-17 01:30:58 +0000 +++ b/bzrlib/tests/per_repository/test_fetch.py 2009-03-02 03:38:07 +0000 @@ -46,7 +46,7 @@ revision_id=None) ## pb=bzrlib.progress.DummyProgress()) - def test_fetch_knit3(self): + def test_fetch_to_knit3(self): # create a repository of the sort we are testing. tree_a = self.make_branch_and_tree('a') self.build_tree(['a/foo']) @@ -80,7 +80,10 @@ try: tree_b = b_bzrdir.create_workingtree() except errors.NotLocalUrl: - raise TestSkipped("cannot make working tree with transport %r" + try: + tree_b = b_branch.create_checkout('b', lightweight=True) + except errors.NotLocalUrl: + raise TestSkipped("cannot make working tree with transport %r" % b_bzrdir.transport) tree_b.commit('no change', rev_id='rev2') rev2_tree = knit3_repo.revision_tree('rev2') === modified file 'bzrlib/tests/test_smart.py' --- a/bzrlib/tests/test_smart.py 2009-02-27 01:02:40 +0000 +++ b/bzrlib/tests/test_smart.py 2009-03-02 03:38:07 +0000 @@ -1201,6 +1201,9 @@ smart.request.request_handlers.get('Repository.lock_write'), smart.repository.SmartServerRepositoryLockWrite) self.assertEqual( + smart.request.request_handlers.get('Repository.StreamSource.get_stream'), + smart.repository.SmartServerRepositoryStreamSourceGetStream) + self.assertEqual( smart.request.request_handlers.get('Repository.tarball'), smart.repository.SmartServerRepositoryTarball) self.assertEqual( === modified file 'bzrlib/versionedfile.py' --- a/bzrlib/versionedfile.py 2009-02-23 15:29:35 +0000 +++ b/bzrlib/versionedfile.py 2009-03-02 03:38:07 +0000 @@ -1534,19 +1534,19 @@ def fulltext_network_to_record(kind, bytes, line_end): """Convert a network fulltext record to record.""" meta_len, = struct.unpack('!L', bytes[line_end:line_end+4]) - record_meta = record_bytes[line_end+4:line_end+4+meta_len] + record_meta = bytes[line_end+4:line_end+4+meta_len] key, parents = bencode.bdecode_as_tuple(record_meta) if parents == 'nil': parents = None - fulltext = record_bytes[line_end+4+meta_len:] - return FulltextContentFactory(key, parents, None, fulltext) + fulltext = bytes[line_end+4+meta_len:] + return [FulltextContentFactory(key, parents, None, fulltext)] def _length_prefix(bytes): return struct.pack('!L', len(bytes)) -def record_to_fulltext_bytes(self, record): +def record_to_fulltext_bytes(record): if record.parents is None: parents = 'nil' else: -- bazaar-commits mailing list [email protected] https://lists.ubuntu.com/mailman/listinfo/bazaar-commits
