mchades commented on code in PR #9570:
URL: https://github.com/apache/gravitino/pull/9570#discussion_r2785666506


##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -754,3 +785,116 @@ def _get_fileset(self, fileset_ident: NameIdentifier):
             return fileset
         finally:
             write_lock.release()
+
+    def _get_fileset_schema(self, schema_ident: NameIdentifier):
+        """Get the schema by the schema identifier from the cache or load it 
from the server if the cache is disabled.
+        :param schema_ident: The schema identifier
+        :return: The schema
+        """
+        if not self._enable_fileset_metadata_cache:
+            catalog_ident: NameIdentifier = NameIdentifier.of(
+                schema_ident.namespace().level(0), 
schema_ident.namespace().level(1)
+            )
+            catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+            return catalog.as_schemas().load_schema(schema_ident.name())
+
+        read_lock = self._schema_cache_lock.gen_rlock()
+        try:
+            read_lock.acquire()
+            cache_value: Schema = self._schema_cache.get(schema_ident)
+            if cache_value is not None:
+                return cache_value
+        finally:
+            read_lock.release()
+
+        write_lock = self._schema_cache_lock.gen_wlock()
+        try:
+            write_lock.acquire()
+            cache_value: Schema = self._schema_cache.get(schema_ident)
+            if cache_value is not None:
+                return cache_value
+
+            catalog_ident: NameIdentifier = NameIdentifier.of(
+                schema_ident.namespace().level(0), 
schema_ident.namespace().level(1)
+            )
+            catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+            schema = catalog.as_schemas().load_schema(schema_ident.name())
+            self._schema_cache[schema_ident] = schema
+            return schema
+        finally:
+            write_lock.release()
+
+    def _get_base_location(self, actual_location: str) -> str:
+        """Get the base location (scheme + authority) from the actual location 
path.
+        :param actual_location: The actual location path (e.g., 
's3://bucket/path')
+        :return: The base location (e.g., 's3://bucket')
+        """
+        parsed_uri = urlparse(actual_location)
+        scheme = parsed_uri.scheme if parsed_uri.scheme else "file"
+        authority = parsed_uri.netloc if parsed_uri.netloc else ""
+        return f"{scheme}://{authority}"
+
+    def _get_user_defined_configs(self, path: str) -> Dict[str, str]:
+        """Get user defined configurations for a specific path based on the 
path's base location
+        (scheme://authority).
+
+        The logic:
+        1. Extract baseLocation (scheme://authority) from the given path
+        2. Find config entries like "fs.path.config.<name> = <base_location>" 
where the
+           base_location matches the extracted baseLocation
+        3. Extract the name from the matching entry
+        4. Then find all config entries with prefix "fs.path.config.<name>." 
and extract properties
+
+        Example:
+          fs.path.config.cluster1 = s3://bucket1
+          fs.path.config.cluster1.aws-access-key = XXX1
+          fs.path.config.cluster1.aws-secret-key = XXX2
+          If path is "s3://bucket1/path/fileset1", then baseLocation is 
"s3://bucket1",
+          cluster1 matches and we extract:
+          - aws-access-key = XXX1
+          - aws-secret-key = XXX2

Review Comment:
   Nit: The docstring examples use dot-separated config keys (e.g., 
`fs.path.config.cluster1`), but
      the actual implementation uses underscore-separated prefix 
`fs_path_config_`. This is confusing
      and could mislead users.
   
      Please update the docstring to use underscore-separated keys consistent 
with the implementation:
   
          Example:
            fs_path_config_cluster1 = s3://bucket1
            fs_path_config_cluster1_aws-access-key = XXX1
            fs_path_config_cluster1_aws-secret-key = XXX2



##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -489,6 +493,30 @@ def _get_actual_filesystem(
             fileset_ident, location_name
         )
 
+    def _merge_fileset_properties(
+        self,
+        catalog: FilesetCatalog,
+        schema: Schema,
+        fileset: Fileset,
+        actual_location: str,
+    ) -> Dict[str, str]:
+        """Merge properties from catalog, schema, fileset, options, and 
user-defined configs.
+        :param catalog: The fileset catalog
+        :param schema: The schema
+        :param fileset: The fileset
+        :param actual_location: The actual storage location
+        :return: Merged properties dictionary
+        """
+        fileset_props = dict(catalog.properties())
+        fileset_props.update(schema.properties())
+        fileset_props.update(fileset.properties())

Review Comment:
   why? `schema.properties` can be None



##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -754,3 +785,116 @@ def _get_fileset(self, fileset_ident: NameIdentifier):
             return fileset
         finally:
             write_lock.release()
+
+    def _get_fileset_schema(self, schema_ident: NameIdentifier):
+        """Get the schema by the schema identifier from the cache or load it 
from the server if the cache is disabled.
+        :param schema_ident: The schema identifier
+        :return: The schema
+        """
+        if not self._enable_fileset_metadata_cache:
+            catalog_ident: NameIdentifier = NameIdentifier.of(
+                schema_ident.namespace().level(0), 
schema_ident.namespace().level(1)
+            )
+            catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+            return catalog.as_schemas().load_schema(schema_ident.name())
+
+        read_lock = self._schema_cache_lock.gen_rlock()
+        try:
+            read_lock.acquire()
+            cache_value: Schema = self._schema_cache.get(schema_ident)
+            if cache_value is not None:
+                return cache_value
+        finally:
+            read_lock.release()
+
+        write_lock = self._schema_cache_lock.gen_wlock()
+        try:
+            write_lock.acquire()
+            cache_value: Schema = self._schema_cache.get(schema_ident)
+            if cache_value is not None:
+                return cache_value
+
+            catalog_ident: NameIdentifier = NameIdentifier.of(
+                schema_ident.namespace().level(0), 
schema_ident.namespace().level(1)
+            )
+            catalog: FilesetCatalog = self._get_fileset_catalog(catalog_ident)
+            schema = catalog.as_schemas().load_schema(schema_ident.name())
+            self._schema_cache[schema_ident] = schema
+            return schema
+        finally:
+            write_lock.release()
+
+    def _get_base_location(self, actual_location: str) -> str:
+        """Get the base location (scheme + authority) from the actual location 
path.
+        :param actual_location: The actual location path (e.g., 
's3://bucket/path')
+        :return: The base location (e.g., 's3://bucket')
+        """
+        parsed_uri = urlparse(actual_location)
+        scheme = parsed_uri.scheme if parsed_uri.scheme else "file"
+        authority = parsed_uri.netloc if parsed_uri.netloc else ""
+        return f"{scheme}://{authority}"
+
+    def _get_user_defined_configs(self, path: str) -> Dict[str, str]:
+        """Get user defined configurations for a specific path based on the 
path's base location
+        (scheme://authority).
+
+        The logic:
+        1. Extract baseLocation (scheme://authority) from the given path
+        2. Find config entries like "fs.path.config.<name> = <base_location>" 
where the
+           base_location matches the extracted baseLocation
+        3. Extract the name from the matching entry
+        4. Then find all config entries with prefix "fs.path.config.<name>." 
and extract properties
+
+        Example:
+          fs.path.config.cluster1 = s3://bucket1
+          fs.path.config.cluster1.aws-access-key = XXX1
+          fs.path.config.cluster1.aws-secret-key = XXX2
+          If path is "s3://bucket1/path/fileset1", then baseLocation is 
"s3://bucket1",
+          cluster1 matches and we extract:
+          - aws-access-key = XXX1
+          - aws-secret-key = XXX2
+
+        :param path: The path to extract configurations for (e.g., 
's3://bucket/path/to/file')
+        :return: A map of configuration properties for the given path
+        """
+        properties: Dict[str, str] = {}
+        if not path:
+            return properties
+
+        base_location = self._get_base_location(path)
+        location_name = None
+        config_prefix = GVFSConfig.FS_GRAVITINO_PATH_CONFIG_PREFIX
+
+        # First pass: find the location name by matching baseLocation
+        # Look for entries like "fs_path_config_<name> = <base_location>"
+        # The key format should be exactly "fs_path_config_<name>" (no 
underscore after name)
+        if self._options:
+            for key, value in self._options.items():
+                if not key.startswith(config_prefix):
+                    continue
+
+                suffix = key[len(config_prefix) :]
+                # Check if this is a location definition (no underscore after 
the name)
+                # Format: "fs_path_config_<name>" (not 
"fs_path_config_<name>_<property>")
+                if "_" not in suffix and suffix and value:
+                    # This is a location definition: "fs_path_config_<name>"
+                    # Extract baseLocation from the value and compare with the 
path's baseLocation
+                    if base_location == value:

Review Comment:
    Bug: Base location matching fails when the config value has a trailing 
slash.
   
      `_get_base_location("hdfs://cluster1/path/to/file")` returns 
`"hdfs://cluster1"` (no trailing
      slash), but the documentation example explicitly shows trailing slashes:
   
          "fs_path_config_cluster1": "hdfs://cluster1/"
   
      So `"hdfs://cluster1" == "hdfs://cluster1/"` → False, and the 
user-defined config is silently
      ignored. The Java implementation (FilesetUtil) correctly normalizes both 
sides by parsing the
      config value as a URI first.
   
      Suggested fix — normalize the config value before comparing:
   
          config_base_location = self._get_base_location(value)
          if base_location == config_base_location:
              location_name = suffix
              break



##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -629,8 +662,7 @@ def _get_filesystem(
         the same configuration will share the same filesystem instance.
 
         :param credentials: The credentials for accessing the filesystem
-        :param catalog_props: The catalog properties
-        :param options: The GVFS options
+        :param fileset_props: The catalog properties

Review Comment:
   Nit: The docstring still says `:param fileset_props: The catalog 
properties`, but it should say
      `:param fileset_props: The merged fileset properties` since the semantics 
have changed.



##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -645,7 +677,7 @@ def _get_filesystem(
         # This allows multiple filesets pointing to the same storage to share
         # the same filesystem instance
         cache_key = FileSystemCacheKey(
-            scheme, authority, credentials, catalog_props, options, kwargs
+            scheme, authority, credentials, fileset_props, self._options, 
kwargs

Review Comment:
   The `FileSystemCacheKey` now includes `fileset_props` (which contains 
fileset-specific metadata
      from `fileset.properties()`) instead of just `catalog_props`. This means 
two different filesets
      pointing to the same storage location (same bucket, same endpoint, same 
credentials) but having
      different fileset-level metadata will produce different cache keys and 
thus different filesystem
      instances.
   
      This contradicts the comment right above: "This allows multiple filesets 
pointing to the same
      storage to share the same filesystem instance."
   
      Additionally, `self._options` is included in the cache key, but it is 
already merged into
      `fileset_props` by `_merge_fileset_properties`, making it redundant.
   



##########
clients/client-python/gravitino/filesystem/gvfs_base_operations.py:
##########
@@ -489,6 +493,30 @@ def _get_actual_filesystem(
             fileset_ident, location_name
         )
 
+    def _merge_fileset_properties(
+        self,
+        catalog: FilesetCatalog,
+        schema: Schema,
+        fileset: Fileset,
+        actual_location: str,
+    ) -> Dict[str, str]:
+        """Merge properties from catalog, schema, fileset, options, and 
user-defined configs.
+        :param catalog: The fileset catalog
+        :param schema: The schema
+        :param fileset: The fileset
+        :param actual_location: The actual storage location
+        :return: Merged properties dictionary
+        """
+        fileset_props = dict(catalog.properties())
+        fileset_props.update(schema.properties())
+        fileset_props.update(fileset.properties())
+        # Get user-defined configurations for the actual location
+        user_defined_configs = self._get_user_defined_configs(actual_location)
+        fileset_props.update(user_defined_configs)
+        if self._options:
+            fileset_props.update(self._options)

Review Comment:
   Bug: Global `self._options` is applied last, overwriting path-specific 
`user_defined_configs`.
   
      Current order:
   ```python
          fileset_props.update(user_defined_configs)   # path-specific override
          fileset_props.update(self._options)           # global options — 
overwrites above!
   ```
      If global options contain a key like `s3_access_key_id`, it will clobber 
the per-cluster value
      extracted from `fs_path_config_<name>_s3_access_key_id`, defeating the 
purpose of multi-cluster
      configuration.
   
      Suggested fix — apply global options *before* user-defined configs so 
that path-specific configs
      take precedence:
   
          if self._options:
              fileset_props.update(self._options)
          user_defined_configs = self._get_user_defined_configs(actual_location)
          fileset_props.update(user_defined_configs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to