mchades commented on code in PR #9570:
URL: https://github.com/apache/gravitino/pull/9570#discussion_r2785666506
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -754,3 +785,116 @@ def _get_fileset(self, fileset_ident: NameIdentifier):
return fileset
finally:
write_lock.release()
+
+ def _get_fileset_schema(self, schema_ident: NameIdentifier):
+ """Get the schema by the schema identifier from the cache or load it
from the server if the cache is disabled.
+ :param schema_ident: The schema identifier
+ :return: The schema
+ """
+ if not self._enable_fileset_metadata_cache:
+ catalog_ident: NameIdentifier = NameIdentifier.of(
+ schema_ident.namespace().level(0),
schema_ident.namespace().level(1)
+ )
+ catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+ return catalog.as_schemas().load_schema(schema_ident.name())
+
+ read_lock = self._schema_cache_lock.gen_rlock()
+ try:
+ read_lock.acquire()
+ cache_value: Schema = self._schema_cache.get(schema_ident)
+ if cache_value is not None:
+ return cache_value
+ finally:
+ read_lock.release()
+
+ write_lock = self._schema_cache_lock.gen_wlock()
+ try:
+ write_lock.acquire()
+ cache_value: Schema = self._schema_cache.get(schema_ident)
+ if cache_value is not None:
+ return cache_value
+
+ catalog_ident: NameIdentifier = NameIdentifier.of(
+ schema_ident.namespace().level(0),
schema_ident.namespace().level(1)
+ )
+ catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+ schema = catalog.as_schemas().load_schema(schema_ident.name())
+ self._schema_cache[schema_ident] = schema
+ return schema
+ finally:
+ write_lock.release()
+
+ def _get_base_location(self, actual_location: str) -> str:
+ """Get the base location (scheme + authority) from the actual location
path.
+ :param actual_location: The actual location path (e.g.,
's3://bucket/path')
+ :return: The base location (e.g., 's3://bucket')
+ """
+ parsed_uri = urlparse(actual_location)
+ scheme = parsed_uri.scheme if parsed_uri.scheme else "file"
+ authority = parsed_uri.netloc if parsed_uri.netloc else ""
+ return f"{scheme}://{authority}"
+
+ def _get_user_defined_configs(self, path: str) -> Dict[str, str]:
+ """Get user defined configurations for a specific path based on the
path's base location
+ (scheme://authority).
+
+ The logic:
+ 1. Extract baseLocation (scheme://authority) from the given path
+ 2. Find config entries like "fs.path.config.<name> = <base_location>"
where the
+ base_location matches the extracted baseLocation
+ 3. Extract the name from the matching entry
+ 4. Then find all config entries with prefix "fs.path.config.<name>."
and extract properties
+
+ Example:
+ fs.path.config.cluster1 = s3://bucket1
+ fs.path.config.cluster1.aws-access-key = XXX1
+ fs.path.config.cluster1.aws-secret-key = XXX2
+ If path is "s3://bucket1/path/fileset1", then baseLocation is
"s3://bucket1",
+ cluster1 matches and we extract:
+ - aws-access-key = XXX1
+ - aws-secret-key = XXX2
Review Comment:
Nit: The docstring examples use dot-separated config keys (e.g.,
`fs.path.config.cluster1`), but
the actual implementation uses underscore-separated prefix
`fs_path_config_`. This is confusing
and could mislead users.
Please update the docstring to use underscore-separated keys consistent
with the implementation:
Example:
fs_path_config_cluster1 = s3://bucket1
fs_path_config_cluster1_aws-access-key = XXX1
fs_path_config_cluster1_aws-secret-key = XXX2
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -489,6 +493,30 @@ def _get_actual_filesystem(
fileset_ident, location_name
)
+ def _merge_fileset_properties(
+ self,
+ catalog: FilesetCatalog,
+ schema: Schema,
+ fileset: Fileset,
+ actual_location: str,
+ ) -> Dict[str, str]:
+ """Merge properties from catalog, schema, fileset, options, and
user-defined configs.
+ :param catalog: The fileset catalog
+ :param schema: The schema
+ :param fileset: The fileset
+ :param actual_location: The actual storage location
+ :return: Merged properties dictionary
+ """
+ fileset_props = dict(catalog.properties())
+ fileset_props.update(schema.properties())
+ fileset_props.update(fileset.properties())
Review Comment:
why? `schema.properties` can be None
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -754,3 +785,116 @@ def _get_fileset(self, fileset_ident: NameIdentifier):
return fileset
finally:
write_lock.release()
+
+ def _get_fileset_schema(self, schema_ident: NameIdentifier):
+ """Get the schema by the schema identifier from the cache or load it
from the server if the cache is disabled.
+ :param schema_ident: The schema identifier
+ :return: The schema
+ """
+ if not self._enable_fileset_metadata_cache:
+ catalog_ident: NameIdentifier = NameIdentifier.of(
+ schema_ident.namespace().level(0),
schema_ident.namespace().level(1)
+ )
+ catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+ return catalog.as_schemas().load_schema(schema_ident.name())
+
+ read_lock = self._schema_cache_lock.gen_rlock()
+ try:
+ read_lock.acquire()
+ cache_value: Schema = self._schema_cache.get(schema_ident)
+ if cache_value is not None:
+ return cache_value
+ finally:
+ read_lock.release()
+
+ write_lock = self._schema_cache_lock.gen_wlock()
+ try:
+ write_lock.acquire()
+ cache_value: Schema = self._schema_cache.get(schema_ident)
+ if cache_value is not None:
+ return cache_value
+
+ catalog_ident: NameIdentifier = NameIdentifier.of(
+ schema_ident.namespace().level(0),
schema_ident.namespace().level(1)
+ )
+ catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+ schema = catalog.as_schemas().load_schema(schema_ident.name())
+ self._schema_cache[schema_ident] = schema
+ return schema
+ finally:
+ write_lock.release()
+
+ def _get_base_location(self, actual_location: str) -> str:
+ """Get the base location (scheme + authority) from the actual location
path.
+ :param actual_location: The actual location path (e.g.,
's3://bucket/path')
+ :return: The base location (e.g., 's3://bucket')
+ """
+ parsed_uri = urlparse(actual_location)
+ scheme = parsed_uri.scheme if parsed_uri.scheme else "file"
+ authority = parsed_uri.netloc if parsed_uri.netloc else ""
+ return f"{scheme}://{authority}"
+
+ def _get_user_defined_configs(self, path: str) -> Dict[str, str]:
+ """Get user defined configurations for a specific path based on the
path's base location
+ (scheme://authority).
+
+ The logic:
+ 1. Extract baseLocation (scheme://authority) from the given path
+ 2. Find config entries like "fs.path.config.<name> = <base_location>"
where the
+ base_location matches the extracted baseLocation
+ 3. Extract the name from the matching entry
+ 4. Then find all config entries with prefix "fs.path.config.<name>."
and extract properties
+
+ Example:
+ fs.path.config.cluster1 = s3://bucket1
+ fs.path.config.cluster1.aws-access-key = XXX1
+ fs.path.config.cluster1.aws-secret-key = XXX2
+ If path is "s3://bucket1/path/fileset1", then baseLocation is
"s3://bucket1",
+ cluster1 matches and we extract:
+ - aws-access-key = XXX1
+ - aws-secret-key = XXX2
+
+ :param path: The path to extract configurations for (e.g.,
's3://bucket/path/to/file')
+ :return: A map of configuration properties for the given path
+ """
+ properties: Dict[str, str] = {}
+ if not path:
+ return properties
+
+ base_location = self._get_base_location(path)
+ location_name = None
+ config_prefix = GVFSConfig.FS_GRAVITINO_PATH_CONFIG_PREFIX
+
+ # First pass: find the location name by matching baseLocation
+ # Look for entries like "fs_path_config_<name> = <base_location>"
+ # The key format should be exactly "fs_path_config_<name>" (no
underscore after name)
+ if self._options:
+ for key, value in self._options.items():
+ if not key.startswith(config_prefix):
+ continue
+
+ suffix = key[len(config_prefix) :]
+ # Check if this is a location definition (no underscore after
the name)
+ # Format: "fs_path_config_<name>" (not
"fs_path_config_<name>_<property>")
+ if "_" not in suffix and suffix and value:
+ # This is a location definition: "fs_path_config_<name>"
+ # Extract baseLocation from the value and compare with the
path's baseLocation
+ if base_location == value:
Review Comment:
Bug: Base location matching fails when the config value has a trailing
slash.
`_get_base_location("hdfs://cluster1/path/to/file")` returns
`"hdfs://cluster1"` (no trailing
slash), but the documentation example explicitly shows trailing slashes:
"fs_path_config_cluster1": "hdfs://cluster1/"
So `"hdfs://cluster1" == "hdfs://cluster1/"` → False, and the
user-defined config is silently
ignored. The Java implementation (FilesetUtil) correctly normalizes both
sides by parsing the
config value as a URI first.
Suggested fix — normalize the config value before comparing:
config_base_location = self._get_base_location(value)
if base_location == config_base_location:
location_name = suffix
break
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -629,8 +662,7 @@ def _get_filesystem(
the same configuration will share the same filesystem instance.
:param credentials: The credentials for accessing the filesystem
- :param catalog_props: The catalog properties
- :param options: The GVFS options
+ :param fileset_props: The catalog properties
Review Comment:
Nit: The docstring still says `:param fileset_props: The catalog
properties`, but it should say
`:param fileset_props: The merged fileset properties` since the semantics
have changed.
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -645,7 +677,7 @@ def _get_filesystem(
# This allows multiple filesets pointing to the same storage to share
# the same filesystem instance
cache_key = FileSystemCacheKey(
- scheme, authority, credentials, catalog_props, options, kwargs
+ scheme, authority, credentials, fileset_props, self._options,
kwargs
Review Comment:
The `FileSystemCacheKey` now includes `fileset_props` (which contains
fileset-specific metadata
from `fileset.properties()`) instead of just `catalog_props`. This means
two different filesets
pointing to the same storage location (same bucket, same endpoint, same
credentials) but having
different fileset-level metadata will produce different cache keys and
thus different filesystem
instances.
This contradicts the comment right above: "This allows multiple filesets
pointing to the same
storage to share the same filesystem instance."
Additionally, `self._options` is included in the cache key, but it is
already merged into
`fileset_props` by `_merge_fileset_properties`, making it redundant.
##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -489,6 +493,30 @@ def _get_actual_filesystem(
fileset_ident, location_name
)
+ def _merge_fileset_properties(
+ self,
+ catalog: FilesetCatalog,
+ schema: Schema,
+ fileset: Fileset,
+ actual_location: str,
+ ) -> Dict[str, str]:
+ """Merge properties from catalog, schema, fileset, options, and
user-defined configs.
+ :param catalog: The fileset catalog
+ :param schema: The schema
+ :param fileset: The fileset
+ :param actual_location: The actual storage location
+ :return: Merged properties dictionary
+ """
+ fileset_props = dict(catalog.properties())
+ fileset_props.update(schema.properties())
+ fileset_props.update(fileset.properties())
+ # Get user-defined configurations for the actual location
+ user_defined_configs = self._get_user_defined_configs(actual_location)
+ fileset_props.update(user_defined_configs)
+ if self._options:
+ fileset_props.update(self._options)
Review Comment:
Bug: Global `self._options` is applied last, overwriting path-specific
`user_defined_configs`.
Current order:
```python
fileset_props.update(user_defined_configs) # path-specific override
fileset_props.update(self._options) # global options —
overwrites above!
```
If global options contain a key like `s3_access_key_id`, it will clobber
the per-cluster value
extracted from `fs_path_config_<name>_s3_access_key_id`, defeating the
purpose of multi-cluster
configuration.
Suggested fix — apply global options *before* user-defined configs so
that path-specific configs
take precedence:
if self._options:
fileset_props.update(self._options)
user_defined_configs = self._get_user_defined_configs(actual_location)
fileset_props.update(user_defined_configs)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]