Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-gcsfs for openSUSE:Factory checked in at 2022-11-21 16:19:36 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-gcsfs (Old) and /work/SRC/openSUSE:Factory/.python-gcsfs.new.1597 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-gcsfs" Mon Nov 21 16:19:36 2022 rev:14 rq:1036853 version:2022.11.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-gcsfs/python-gcsfs.changes 2022-11-01 13:43:36.220226583 +0100 +++ /work/SRC/openSUSE:Factory/.python-gcsfs.new.1597/python-gcsfs.changes 2022-11-21 16:19:37.447863553 +0100 @@ -1,0 +2,6 @@ +Sat Nov 19 14:45:02 UTC 2022 - Ben Greiner <[email protected]> + +- Update to 2022.11.0 + * implement object versioning (#504) + +------------------------------------------------------------------- Old: ---- gcsfs-2022.10.0-gh.tar.gz New: ---- gcsfs-2022.11.0-gh.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-gcsfs.spec ++++++ --- /var/tmp/diff_new_pack.jrVLp1/_old 2022-11-21 16:19:38.263867744 +0100 +++ /var/tmp/diff_new_pack.jrVLp1/_new 2022-11-21 16:19:38.271867785 +0100 @@ -17,7 +17,7 @@ Name: python-gcsfs -Version: 2022.10.0 +Version: 2022.11.0 Release: 0 Summary: Filesystem interface over GCS License: BSD-3-Clause ++++++ gcsfs-2022.10.0-gh.tar.gz -> gcsfs-2022.11.0-gh.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/docs/source/changelog.rst new/gcsfs-2022.11.0/docs/source/changelog.rst --- old/gcsfs-2022.10.0/docs/source/changelog.rst 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/docs/source/changelog.rst 2022-11-10 03:57:38.000000000 +0100 @@ -1,6 +1,11 @@ Changelog ========= +2022.11.0 +--------- + +* implement object versioning (#504) + 2022.10.0 --------- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/gcsfs/_version.py new/gcsfs-2022.11.0/gcsfs/_version.py --- old/gcsfs-2022.10.0/gcsfs/_version.py 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/gcsfs/_version.py 2022-11-10 03:57:38.000000000 +0100 @@ -22,9 +22,9 @@ # setup.py/versioneer.py will grep for the variable names, so they must # each be defined on a line of their own. _version.py will just call # get_keywords(). - git_refnames = "2022.10.0" - git_full = "1d34b2ef2305dd4f328e3cac527a437582226e12" - git_date = "2022-10-19 12:54:45 -0400" + git_refnames = "2022.11.0" + git_full = "805d3fd359ba5189964f8804459653ce1eb4d38c" + git_date = "2022-11-09 21:57:38 -0500" keywords = {"refnames": git_refnames, "full": git_full, "date": git_date} return keywords diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/gcsfs/core.py new/gcsfs-2022.11.0/gcsfs/core.py --- old/gcsfs-2022.10.0/gcsfs/core.py 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/gcsfs/core.py 2022-11-10 03:57:38.000000000 +0100 @@ -23,6 +23,7 @@ from .credentials import GoogleCredentials from . import __version__ as version from urllib.parse import quote as quote_urllib +from urllib.parse import parse_qs, urlsplit logger = logging.getLogger("gcsfs") @@ -85,7 +86,8 @@ Used by petastorm, do not remove. """ - return "/".join(GCSFileSystem.split_path(path)) + bucket, name, _ = GCSFileSystem._split_path(path) + return "/".join((bucket, name)) async def _req_to_text(r): @@ -119,6 +121,22 @@ yield lst[i : i + n] +def _coalesce_generation(*args): + """Helper to coalesce a list of object generations down to one.""" + generations = set(args) + if None in generations: + generations.remove(None) + if len(generations) > 1: + raise ValueError( + "Cannot coalesce generations where more than one are defined," + " {}".format(generations) + ) + elif len(generations) == 0: + return None + else: + return generations.pop() + + class GCSFileSystem(AsyncFileSystem): r""" Connect to Google Cloud Storage. @@ -221,6 +239,9 @@ Default location where buckets are created, like 'US' or 'EUROPE-WEST3'. You can find a list of all available locations here: https://cloud.google.com/storage/docs/locations#available-locations + version_aware: bool + Whether to support object versioning. If enabled this will require the + user to have the necessary permissions for dealing with versioned objects. """ scopes = {"read_only", "read_write", "full_control"} @@ -247,6 +268,7 @@ timeout=None, endpoint_url=None, default_location=None, + version_aware=False, **kwargs, ): super().__init__( @@ -271,6 +293,7 @@ self._endpoint = endpoint_url self.session_kwargs = session_kwargs or {} self.default_location = default_location + self.version_aware = version_aware if check_connection: warnings.warn( @@ -334,6 +357,13 @@ # use of root_marker to make minimum required path, e.g., "/" return path or cls.root_marker + @classmethod + def _get_kwargs_from_urls(cls, path): + _, _, generation = cls._split_path(path, version_aware=True) + if generation is not None: + return {"version_aware": True} + return {} + def _get_params(self, kwargs): params = {k: v for k, v in kwargs.items() if v is not None} # needed for requester pays buckets @@ -408,8 +438,7 @@ b["name"] for b in sync(self.loop, self._list_buckets, timeout=self.timeout) ] - @staticmethod - def _process_object(bucket, object_metadata): + def _process_object(self, bucket, object_metadata): """Process object resource into gcsfs object information format. Process GCS object resource via type casting and attribute updates to @@ -422,6 +451,9 @@ result["size"] = int(object_metadata.get("size", 0)) result["name"] = posixpath.join(bucket, object_metadata["name"]) result["type"] = "file" + if "generation" in object_metadata or "metageneration" in object_metadata: + result["generation"] = object_metadata.get("generation") + result["metageneration"] = object_metadata.get("metageneration") return result @@ -435,13 +467,18 @@ async def _get_object(self, path): """Return object information at the given path.""" - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) # Check if parent dir is in listing cache listing = self._ls_from_cache(path) if listing: + name = "/".join((bucket, key)) for file_details in listing: - if file_details["type"] == "file" and file_details["name"] == path: + if ( + file_details["type"] == "file" + and file_details["name"] == name + and (not generation or file_details.get("generation") == generation) + ): return file_details else: raise FileNotFoundError(path) @@ -455,23 +492,33 @@ # Work around various permission settings. Prefer an object get (storage.objects.get), but # fall back to a bucket list + filter to object name (storage.objects.list). try: - res = await self._call("GET", "b/{}/o/{}", bucket, key, json_out=True) + res = await self._call( + "GET", "b/{}/o/{}", bucket, key, json_out=True, generation=generation + ) except OSError as e: if not str(e).startswith("Forbidden"): raise resp = await self._call( - "GET", "b/{}/o", bucket, json_out=True, prefix=key, maxResults=1 + "GET", + "b/{}/o", + bucket, + json_out=True, + prefix=key, + maxResults=1 if not generation else None, + versions="true" if generation else None, ) for item in resp.get("items", []): - if item["name"] == key: + if item["name"] == key and ( + not generation or item.get("generation") == generation + ): res = item break if res is None: raise FileNotFoundError(path) return self._process_object(bucket, res) - async def _list_objects(self, path, prefix=""): - bucket, key = self.split_path(path) + async def _list_objects(self, path, prefix="", versions=False): + bucket, key, generation = self.split_path(path) path = path.rstrip("/") try: @@ -487,7 +534,9 @@ if key: raise - items, prefixes = await self._do_list_objects(path, prefix=prefix) + items, prefixes = await self._do_list_objects( + path, prefix=prefix, versions=versions + ) pseudodirs = [ { @@ -510,9 +559,11 @@ self.dircache[path] = out return out - async def _do_list_objects(self, path, max_results=None, delimiter="/", prefix=""): + async def _do_list_objects( + self, path, max_results=None, delimiter="/", prefix="", versions=False + ): """Object listing for the given {bucket}/{prefix}/ path.""" - bucket, _path = self.split_path(path) + bucket, _path, generation = self.split_path(path) _path = "" if not _path else _path.rstrip("/") + "/" prefix = f"{_path}{prefix}" or None @@ -526,6 +577,7 @@ prefix=prefix, maxResults=max_results, json_out=True, + versions="true" if versions or generation else None, ) prefixes.extend(page.get("prefixes", [])) @@ -542,6 +594,7 @@ maxResults=max_results, pageToken=next_page_token, json_out=True, + versions="true" if generation else None, ) assert page["kind"] == "storage#objects" @@ -610,6 +663,7 @@ default_acl="bucketOwnerFullControl", location=None, create_parents=True, + enable_versioning=False, **kwargs, ): """ @@ -635,8 +689,11 @@ https://cloud.google.com/storage/docs/locations#available-locations create_parents: bool If True, creates the bucket in question, if it doesn't already exist + enable_versioning: bool + If True, creates the bucket in question with object versioning + enabled. """ - bucket, object = self.split_path(path) + bucket, object, generation = self.split_path(path) if bucket in ["", "/"]: raise ValueError("Cannot create root bucket") if "/" in path and create_parents and await self._exists(bucket): @@ -651,6 +708,8 @@ location = location or self.default_location if location: json_data["location"] = location + if enable_versioning: + json_data["versioning"] = {"enabled": True} await self._call( method="POST", path="b", @@ -682,7 +741,7 @@ rmdir = sync_wrapper(_rmdir) - async def _info(self, path, **kwargs): + async def _info(self, path, generation=None, **kwargs): """File information about this path.""" path = self._strip_protocol(path) if "/" not in path: @@ -700,10 +759,14 @@ # Check directory cache for parent dir parent_path = self._parent(path) parent_cache = self._ls_from_cache(parent_path) - bucket, key = self.split_path(path) + bucket, key, path_generation = self.split_path(path) + generation = _coalesce_generation(generation, path_generation) if parent_cache: + name = "/".join((bucket, key)) for o in parent_cache: - if o["name"].rstrip("/") == path: + if o["name"].rstrip("/") == name and ( + not generation or o.get("generation") == generation + ): return o if self._ls_from_cache(path): # this is a directory @@ -768,10 +831,15 @@ def url(self, path): """Get HTTP URL of the given path""" - u = "{}/download/storage/v1/b/{}/o/{}?alt=media" - bucket, object = self.split_path(path) + u = "{}/download/storage/v1/b/{}/o/{}?alt=media{}" + bucket, object, generation = self.split_path(path) object = quote(object) - return u.format(self._location, bucket, object) + return u.format( + self._location, + bucket, + object, + "&generation={}".format(generation) if generation else "", + ) async def _cat_file(self, path, start=None, end=None, **kwargs): """Simple one-shot get of file data""" @@ -838,7 +906,7 @@ i_json["contentEncoding"] = content_encoding i_json.update(_convert_fixed_key_metadata(fixed_key_metadata)) - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) o_json = await self._call( "PATCH", "b/{}/o/{}", @@ -854,7 +922,7 @@ async def _merge(self, path, paths, acl=None): """Concatenate objects within a single bucket""" - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) source = [{"name": self.split_path(p)[1]} for p in paths] await self._call( "POST", @@ -874,8 +942,10 @@ async def _cp_file(self, path1, path2, acl=None, **kwargs): """Duplicate remote file""" - b1, k1 = self.split_path(path1) - b2, k2 = self.split_path(path2) + b1, k1, g1 = self.split_path(path1) + b2, k2, g2 = self.split_path(path2) + if g2: + raise ValueError("Cannot write to specific object generation") out = await self._call( "POST", "b/{}/o/{}/rewriteTo/b/{}/o/{}", @@ -886,6 +956,7 @@ headers={"Content-Type": "application/json"}, destinationPredefinedAcl=acl, json_out=True, + sourceGeneration=g1, ) while out["done"] is not True: out = await self._call( @@ -899,12 +970,13 @@ rewriteToken=out["rewriteToken"], destinationPredefinedAcl=acl, json_out=True, + sourceGeneration=g1, ) async def _rm_file(self, path, **kwargs): - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) if key: - await self._call("DELETE", "b/{}/o/{}", bucket, key) + await self._call("DELETE", "b/{}/o/{}", bucket, key, generation=generation) self.invalidate_cache(posixpath.dirname(self._strip_protocol(path))) else: await self._rmdir(path) @@ -915,7 +987,7 @@ "Content-Type: application/http\n" "Content-Transfer-Encoding: binary\n" "Content-ID: <b29c5de2-0db4-490b-b421-6a51b598bd11+{i}>" - "\n\nDELETE /storage/v1/b/{bucket}/o/{key} HTTP/1.1\n" + "\n\nDELETE /storage/v1/b/{bucket}/o/{key}{query} HTTP/1.1\n" "Content-Type: application/json\n" "accept: application/json\ncontent-length: 0\n" ) @@ -923,16 +995,19 @@ # Splitting requests into 100 chunk batches # See https://cloud.google.com/storage/docs/batch for chunk in _chunks(paths, 100): - body = "".join( - [ + parts = [] + for i, p in enumerate(chunk): + bucket, key, generation = self.split_path(p) + query = f"?generation={generation}" if generation else "" + parts.append( template.format( i=i + 1, - bucket=p.split("/", 1)[0], - key=quote(p.split("/", 1)[1]), + bucket=quote(bucket), + key=quote(key), + query=query, ) - for i, p in enumerate(chunk) - ] - ) + ) + body = "".join(parts) headers, content = await self._call( "POST", f"{self._location}/batch/storage/v1", @@ -1007,7 +1082,7 @@ ): # enforce blocksize should be a multiple of 2**18 consistency = consistency or self.consistency - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) size = len(data) out = None if size < 5 * 2**20: @@ -1046,7 +1121,9 @@ callback = callback or NoOpCallback() consistency = consistency or self.consistency checker = get_consistency_checker(consistency) - bucket, key = self.split_path(rpath) + bucket, key, generation = self.split_path(rpath) + if generation: + raise ValueError("Cannot write to specific object generation") with open(lpath, "rb") as f0: size = f0.seek(0, 2) f0.seek(0) @@ -1090,9 +1167,11 @@ except IOError: return False - async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs): + async def _find( + self, path, withdirs=False, detail=False, prefix="", versions=False, **kwargs + ): path = self._strip_protocol(path) - bucket, key = self.split_path(path) + bucket, key, generation = self.split_path(path) if prefix: _path = "" if not key else key.rstrip("/") + "/" @@ -1100,7 +1179,9 @@ else: _prefix = key - objects, _ = await self._do_list_objects(bucket, delimiter="", prefix=_prefix) + objects, _ = await self._do_list_objects( + bucket, delimiter="", prefix=_prefix, versions=versions + ) dirs = {} cache_entries = {} @@ -1135,8 +1216,12 @@ objects = sorted(objects + list(dirs.values()), key=lambda x: x["name"]) if detail: + if versions: + return {f"{o['name']}#{o['generation']}": o for o in objects} return {o["name"]: o for o in objects} + if versions: + return [f"{o['name']}#{o['generation']}" for o in objects] return [o["name"] for o in objects] @retry_request(retries=retries) @@ -1191,6 +1276,7 @@ metadata=None, autocommit=True, fixed_key_metadata=None, + generation=None, **kwargs, ): """ @@ -1217,7 +1303,7 @@ ) @classmethod - def split_path(cls, path): + def _split_path(cls, path, version_aware=False): """ Normalise GCS path string into bucket and key. @@ -1225,17 +1311,47 @@ ---------- path : string Input path, like `gcs://mybucket/path/to/file`. - Path is of the form: '[gs|gcs://]bucket[/key]' + Path is of the form: '[gs|gcs://]bucket[/key][?querystring][#fragment]' + + GCS allows object generation (object version) to be specified in either + the URL fragment or the `generation` query parameter. When provided, + the fragment will take priority over the `generation` query paramenter. Returns ------- - (bucket, key) tuple + (bucket, key, generation) tuple """ path = cls._strip_protocol(path).lstrip("/") if "/" not in path: - return path, "" - else: - return path.split("/", 1) + return path, "", None + bucket, keypart = path.split("/", 1) + key = keypart + generation = None + if version_aware: + parts = urlsplit(keypart) + try: + if parts.fragment: + generation = parts.fragment + elif parts.query: + parsed = parse_qs(parts.query) + if "generation" in parsed: + generation = parsed["generation"][0] + # Sanity check whether this could be a valid generation ID. If + # it is not, assume that # or ? characters are supposed to be + # part of the object name. + if generation is not None: + int(generation) + key = parts.path + except ValueError: + generation = None + return ( + bucket, + key, + generation, + ) + + def split_path(self, path): + return self._split_path(path, version_aware=self.version_aware) def sign(self, path, expiration=100, **kwargs): """Create a signed URL representing the given path. @@ -1282,6 +1398,7 @@ content_type=None, timeout=None, fixed_key_metadata=None, + generation=None, **kwargs, ): """ @@ -1321,7 +1438,13 @@ https://cloud.google.com/storage/docs/metadata#mutable timeout: int Timeout seconds for the asynchronous callback. + generation: str + Object generation. """ + bucket, key, path_generation = gcsfs.split_path(path) + if not key: + raise OSError("Attempt to open a bucket") + self.generation = _coalesce_generation(generation, path_generation) super().__init__( gcsfs, path, @@ -1332,9 +1455,6 @@ cache_options=cache_options, **kwargs, ) - bucket, key = self.fs.split_path(path) - if not key: - raise OSError("Attempt to open a bucket") self.gcsfs = gcsfs self.bucket = bucket self.key = key @@ -1358,6 +1478,12 @@ self.blocksize = GCS_MIN_BLOCK_SIZE self.location = None + @property + def details(self): + if self._details is None: + self._details = self.fs.info(self.path, generation=self.generation) + return self._details + def info(self): """File information about this path""" return self.details diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/gcsfs/tests/conftest.py new/gcsfs-2022.11.0/gcsfs/tests/conftest.py --- old/gcsfs-2022.10.0/gcsfs/tests/conftest.py 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/gcsfs/tests/conftest.py 2022-11-10 03:57:38.000000000 +0100 @@ -63,7 +63,8 @@ container = "gcsfs_test" cmd = ( "docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " - "http -public-host http://localhost:4443 -external-url http://localhost:4443" + "http -public-host http://localhost:4443 -external-url http://localhost:4443 " + "-backend memory" ) stop_docker(container) subprocess.check_output(shlex.split(cmd)) @@ -115,5 +116,30 @@ finally: try: gcs.rm(gcs.find(TEST_BUCKET)) + gcs.rm(TEST_BUCKET) + except: # noqa: E722 + pass + + [email protected] +def gcs_versioned(gcs_factory): + gcs = gcs_factory() + gcs.version_aware = True + try: + try: + gcs.rm(gcs.find(TEST_BUCKET, versions=True)) + except FileNotFoundError: + pass + + try: + gcs.mkdir(TEST_BUCKET, enable_versioning=True) + except Exception: + pass + gcs.invalidate_cache() + yield gcs + finally: + try: + gcs.rm(gcs.find(TEST_BUCKET, versions=True)) + gcs.rm(TEST_BUCKET) except: # noqa: E722 pass diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/gcsfs/tests/test_core.py new/gcsfs-2022.11.0/gcsfs/tests/test_core.py --- old/gcsfs-2022.10.0/gcsfs/tests/test_core.py 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/gcsfs/tests/test_core.py 2022-11-10 03:57:38.000000000 +0100 @@ -1157,3 +1157,50 @@ f"{TEST_BUCKET}/deep/nested", f"{TEST_BUCKET}/deep/nested/dir", ] + + +def test_info_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + assert v1 is not None + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + v2 = gcs_versioned.info(a)["generation"] + assert v2 is not None and v1 != v2 + assert gcs_versioned.info(f"{a}#{v1}")["generation"] == v1 + assert gcs_versioned.info(f"{a}?generation={v2}")["generation"] == v2 + + +def test_cat_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + assert v1 is not None + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + gcs_versioned.cat(f"{a}#{v1}") == b"v1" + + +def test_cp_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + assert v1 is not None + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + gcs_versioned.cp_file(f"{a}#{v1}", b) + assert gcs_versioned.cat(b) == b"v1" + + +def test_find_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + v2 = gcs_versioned.info(a)["generation"] + assert {f"{a}#{v1}", f"{a}#{v2}"} == set(gcs_versioned.find(a, versions=True)) + assert {f"{a}#{v1}", f"{a}#{v2}"} == set( + gcs_versioned.find(a, detail=True, versions=True) + ) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/gcsfs-2022.10.0/requirements.txt new/gcsfs-2022.11.0/requirements.txt --- old/gcsfs-2022.10.0/requirements.txt 2022-10-19 18:54:45.000000000 +0200 +++ new/gcsfs-2022.11.0/requirements.txt 2022-11-10 03:57:38.000000000 +0100 @@ -3,5 +3,5 @@ google-cloud-storage requests decorator>4.1.2 -fsspec==2022.10.0 +fsspec==2022.11.0 aiohttp!=4.0.0a0, !=4.0.0a1
