adnanhemani commented on code in PR #1474: URL: https://github.com/apache/polaris/pull/1474#discussion_r2064827447
########## getting-started/assets/polaris/sample-setup-config.yaml: ########## @@ -0,0 +1,174 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +catalogs: + - name: "quickstart_catalog" + type: internal + storage_type: "file" + default_base_location: "file:///var/tmp/quickstart_catalog/" + allowed_locations: + - "file:///var/tmp/quickstart_catalog/" + properties: + catalog: quickstart_catalog + - name: "quickstart_catalog_external" + type: external + storage_type: "file" + default_base_location: "file:///var/tmp/quickstart_catalog_external/" + allowed_locations: + - "file:///var/tmp/quickstart_catalog_external/" + remote_url: "https://example.com/quickstart_catalog_external" + properties: + catalog: quickstart_catalog_external + - name: "quickstart_catalog_s3" + type: internal + storage_type: "s3" + default_base_location: "s3://quickstart-bucket/quickstart_catalog/" + allowed_locations: + - "s3://quickstart-bucket/quickstart_catalog/" + role_arn: "arn:aws:iam::123456789012:role/QuickstartS3Role" + external_id: "12345678-1234-1234-1234-123456789abc" + user_arn: "arn:aws:iam::123456789012:user/QuickstartUser" + region: "us-west-2" + properties: + catalog: quickstart_catalog_s3 + - name: "quickstart_catalog_azure" + type: internal + storage_type: "azure" + default_base_location: "abfss://[email protected]/quickstart_catalog/" + allowed_locations: + - "abfss://[email protected]/quickstart_catalog/" + tenant_id: "12345678-1234-1234-1234-123456789abc" + multi_tenant_app_name: "QuickstartAzureApp" + consent_url: "https://login.microsoftonline.com/consent" + properties: + catalog: quickstart_catalog_azure + ## commented out as implicitly gcs auth is needed Review Comment: Can you expand more on why this is the case for GCS, specifically? IIRC, AWS and Azure have their own auth processes that users would need to set up in order to use those platforms (most of these are listed in the "Getting Started - Deploy on xxx" pages for each cloud provider). ########## client/python/cli/command/setup.py: ########## @@ -0,0 +1,1035 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +import logging +import yaml +from dataclasses import dataclass +from typing import Dict, Optional, List, Any, Set + +from cli.command import Command +from cli.constants import Subcommands, StorageType, UNIT_SEPARATOR, Actions, PrincipalType, CatalogType + +from polaris.management import PolarisDefaultApi +from cli.command.principals import PrincipalsCommand +from cli.command.principal_roles import PrincipalRolesCommand +from cli.command.catalogs import CatalogsCommand +from cli.command.catalog_roles import CatalogRolesCommand +from cli.command.namespaces import NamespacesCommand +from cli.command.privileges import PrivilegesCommand + +logging.basicConfig( + format='%(asctime)s %(levelname)s %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +@dataclass +class SetupCommand(Command): + setup_subcommand: str + setup_config: str + _config_cache: Optional[Dict[str, Any]] = None + + def _load_setup_config(self) -> Dict[str, Any]: + """ + Load and cache the setup configuration from a YAML file. + """ + if self._config_cache is not None: + return self._config_cache + + if not os.path.isfile(self.setup_config): + raise FileNotFoundError(f"Config file not found: {self.setup_config}") + + try: + with open(self.setup_config, 'r') as file: + self._config_cache = yaml.safe_load(file) + if not isinstance(self._config_cache, dict): + raise ValueError("Invalid configuration format: Expected a dictionary at the root level.") + return self._config_cache + except (yaml.YAMLError, ValueError) as e: + raise ValueError(f"Failed to parse YAML configuration: {e}") + + def _extract_principals(self) -> List[Dict[str, Any]]: + """ + Extract principal information from the setup configuration. + """ + config = self._load_setup_config() + principals = config.get('principals', []) + extracted = [] + seen = set() + + # Get allowed principal types from PrincipalType + allowed_principal_types = {pt.value for pt in PrincipalType} + + for principal in principals: + name = principal.get('name') + principal_type = principal.get('type', PrincipalType.SERVICE.value).lower() # Default to 'service' and normalize to lowercase + properties = principal.get('properties', {}) + + # Validate required fields + missing_fields = [field for field, value in { + "name": name + }.items() if not value] + + if missing_fields: + logger.warning(f"Skipping principal due to missing required fields: {', '.join(missing_fields)}. Principal: {principal}") + continue + + # Validate principal type + if principal_type not in allowed_principal_types: + logger.warning(f"Skipping principal '{name}' due to invalid type '{principal_type}'. Allowed types: {', '.join(allowed_principal_types)}") + continue + + # Check for duplicate principals + if name in seen: + logger.warning(f"Skipping duplicate principal: '{name}'") + continue + + seen.add(name) + extracted.append({ + 'name': name, + 'type': principal_type.upper(), + 'properties': properties, + }) + + if not extracted: + logger.warning("No valid principals found in the configuration.") + + return extracted + + def _get_current_principals(self, api: PolarisDefaultApi) -> Set[str]: + """ + Get the current principals from the API. + Returns a set of principal names. + """ + try: + # Fetch principals from the API + principals = api.list_principals().principals + return {principal.name for principal in principals} + except Exception as e: + logger.error(f"Failed to fetch existing principals: {e}") + return set() + + def _create_principals(self, api: PolarisDefaultApi): + """ + Create principals based on the setup configuration. + """ + try: + # Fetch existing principals from the API + existed_principals = self._get_current_principals(api) + + # Extract principals from the configuration + extracted_principals = self._extract_principals() + + # Separate new and skipped principals + new_principals = [ + principal for principal in extracted_principals + if principal['name'] not in existed_principals + ] + skipped_principals = [ + principal['name'] for principal in extracted_principals + if principal['name'] in existed_principals + ] + + # Log skipped principals + if skipped_principals: + logger.warning(f"Skipping creation for already existing principals: {', '.join(skipped_principals)}") + + # Create new principals + for principal in new_principals: + try: + logger.info(f"Creating principal: {principal['name']}") + principal_command = PrincipalsCommand( + principals_subcommand=Subcommands.CREATE, + type=principal['type'].upper(), + principal_name=principal['name'], + client_id='', # Default value + principal_role='', # Default value + properties=principal['properties'], + set_properties={}, # Default value + remove_properties=[] # Default value + ) + principal_command.execute(api) + logger.info(f"Principal '{principal['name']}' created successfully.") + except Exception as e: + logger.error(f"Failed to create principal '{principal['name']}': {e}") + + except Exception as e: + logger.error(f"Failed to create principals: {e}") + + def _extract_principals_role(self) -> List[Dict[str, Any]]: + """ + Extract principal role information from the setup configuration. + """ + config = self._load_setup_config() + principal_roles = config.get('principal_roles', []) + extracted = [] + seen = set() + + for role in principal_roles: + name = role.get('name') + properties = role.get('properties', {}) + + # Validate required fields + missing_fields = [field for field, value in { + "name": name + }.items() if not value] + + if missing_fields: + logger.warning(f"Skipping principal role due to missing required fields: {', '.join(missing_fields)}. Role: {role}") + continue + + # Check for duplicate principal roles + if name in seen: + logger.warning(f"Skipping duplicate principal role: '{name}'") + continue + + seen.add(name) + extracted.append({ + 'name': name, + 'properties': properties, + }) + + if not extracted: + logger.warning("No valid principal roles found in the configuration.") + + return extracted + + def _get_current_principal_roles(self, api: PolarisDefaultApi) -> Set[str]: + """ + Get the current principal roles from the API. + Returns a set of principal role names. + """ + try: + # Fetch principal roles from the API + principal_roles = api.list_principal_roles().roles + return {role.name for role in principal_roles} + except Exception as e: + logger.error(f"Failed to fetch existing principal roles: {e}") + return set() + + def _create_principal_roles(self, api: PolarisDefaultApi): + """ + Create principal roles based on the setup configuration. + """ + try: + # Fetch existing principal roles from the API + existed_principal_roles = self._get_current_principal_roles(api) + + # Extract principal roles from the configuration + extracted_principal_roles = self._extract_principals_role() + + # Separate new and existing principal roles + new_principal_roles = [ + role for role in extracted_principal_roles + if role['name'] not in existed_principal_roles + ] + skipped_principal_roles = [ + role['name'] for role in extracted_principal_roles + if role['name'] in existed_principal_roles + ] + + # Log skipped principal roles + if skipped_principal_roles: + logger.warning(f"Skipping creation for already existing principal roles: {', '.join(skipped_principal_roles)}") + + # Create new principal roles + for role in new_principal_roles: + try: + logger.info(f"Creating principal role: {role['name']}") + principal_role_command = PrincipalRolesCommand( + principal_roles_subcommand=Subcommands.CREATE, + principal_role_name=role['name'], + principal_name='', # Default value + catalog_name='', # Default value + catalog_role_name='', # Default value + properties=role['properties'], + set_properties={}, # Default value + remove_properties=[] # Default value + ) + principal_role_command.execute(api) + logger.info(f"Principal role '{role['name']}' created successfully.") + except Exception as e: + logger.error(f"Failed to create principal role '{role['name']}': {e}") + except Exception as e: + logger.error(f"Failed to create principal roles: {e}") + + def _extract_principals_role_assignments(self) -> List[Dict[str, str]]: + """ + Extract principal-to-principal-role assignments from the setup configuration. + """ + config = self._load_setup_config() + principals = config.get('principals', []) + assignments = [] + seen = set() + + for principal in principals: + name = principal.get('name') + if not name: + logger.warning(f"Skipping principal with missing 'name': {principal}") + continue + + for assignment in principal.get('assignments', []): + principal_role = assignment.get('principal_role') + + # Validate required fields + missing_fields = [field for field, value in { + "principal": name, + "principal_role": principal_role + }.items() if not value] + + if missing_fields: + logger.warning(f"Skipping assignment due to missing required fields: {', '.join(missing_fields)}. Assignment: {assignment}") + continue + + # Check for duplicate assignments + assignment_key = (name, principal_role) + if assignment_key in seen: + logger.warning(f"Skipping duplicate assignment of role '{principal_role}' to principal '{name}'") + continue + + seen.add(assignment_key) + assignments.append({ + 'principal': name, + 'principal_role': principal_role, + }) + + if not assignments: + logger.warning("No valid principal-to-principal-role assignments found in the configuration.") + + return assignments + + def _create_principal_role_assignments(self, api: PolarisDefaultApi): + """ + Create principal-to-principal-role assignments based on the setup configuration. + """ + try: + # Fetch existing principal roles from the API + existed_principal_role_names = self._get_current_principal_roles(api) + + # Extract assignments from the configuration + extracted_assignments = self._extract_principals_role_assignments() + + # Process each assignment + for assignment in extracted_assignments: + principal_name = assignment['principal'] + principal_role = assignment['principal_role'] + + # Validate that the principal role exists + if principal_role not in existed_principal_role_names: + logger.warning(f"Skipping assignment for non-existing principal role '{principal_role}' for principal '{principal_name}'") + continue + + # Assign the principal role + try: + logger.info(f"Assigning principal '{principal_name}' to role '{principal_role}'") + principal_role_command = PrincipalRolesCommand( + principal_roles_subcommand=Subcommands.GRANT, + principal_name=principal_name, + principal_role_name=principal_role, + catalog_name='', # Default value + catalog_role_name='', # Default value + properties='', # Default value + set_properties={}, # Default value + remove_properties=[] # Default value + ) + principal_role_command.execute(api) + logger.info(f"Assigned principal '{principal_name}' to role '{principal_role}' successfully.") + except Exception as e: + logger.error(f"Failed to assign principal '{principal_name}' to role '{principal_role}': {e}") + except Exception as e: + logger.error(f"Failed to create principal role assignments: {e}") + + def _extract_catalogs(self) -> List[Dict[str, Any]]: + """ + Extract catalog information from the setup configuration. + """ + config = self._load_setup_config() + catalogs = config.get('catalogs', []) + extracted = [] + seen = set() + + # Get allowed catalog types from CatalogType + allowed_catalog_types = {ct.value for ct in CatalogType} + + # Get allowed storage types from StorageType + allowed_storage_types = {st.value for st in StorageType} + + for catalog in catalogs: + name = catalog.get('name') + storage_type = catalog.get('storage_type').lower() # Normalize to lowercase + default_base_location = catalog.get('default_base_location') + role_arn = catalog.get('role_arn') + tenant_id = catalog.get('tenant_id') + service_account = catalog.get('service_account') + catalog_type = catalog.get('type', CatalogType.INTERNAL.value).lower() # Default to 'internal' and normalize to lowercase + remote_url = catalog.get('remote_url') + + # Validate required fields + missing_fields = [field for field, value in { + "name": name, + "storage_type": storage_type, + "default_base_location": default_base_location + }.items() if not value] + + if missing_fields: + logger.warning(f"Skipping catalog due to missing required fields: {', '.join(missing_fields)}. Catalog: {catalog}") + continue + + # Validate catalog type + if catalog_type not in allowed_catalog_types: + logger.warning(f"Skipping catalog '{name}' due to invalid type '{catalog_type}'. Allowed types: {', '.join(allowed_catalog_types)}") + continue + + # Validate storage type + if storage_type not in allowed_storage_types: + logger.warning(f"Skipping catalog '{name}' due to invalid storage type '{storage_type}'. Allowed types: {', '.join(allowed_storage_types)}") + continue + + # Additional validation for specific catalog/storage types + if catalog_type == CatalogType.EXTERNAL.value and not remote_url: + logger.warning(f"Skipping external catalog with missing 'remote_url': {catalog}") + continue + if storage_type == StorageType.S3.value and not role_arn: + logger.warning(f"Skipping S3 catalog with missing 'role_arn': {catalog}") + continue + if storage_type == StorageType.AZURE.value and not tenant_id: + logger.warning(f"Skipping Azure catalog with missing 'tenant_id': {catalog}") + continue + if storage_type == StorageType.GCS.value and not service_account: Review Comment: According to https://github.com/apache/polaris/blob/main/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageConfigurationInfo.java#L33, Service Account is not a required field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
