This is an automated email from the ASF dual-hosted git repository.
yasithdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airavata-portals.git
The following commit(s) were added to refs/heads/main by this push:
new 571f78eeb refactor(portal): make group-resource-profile serializers
proto-native (Track D) (#205)
571f78eeb is described below
commit 571f78eeba5879df07a6015dbaed642e4ab8e056
Author: Yasith Jayawardana <[email protected]>
AuthorDate: Tue Jun 9 05:13:37 2026 -0400
refactor(portal): make group-resource-profile serializers proto-native
(Track D) (#205)
Rewrite the GroupResourceProfile family — GroupResourceProfileSerializer,
GroupComputeResourcePreferenceSerializer (the SLURM/AWS union), and the
Reservation / SSH-provisioner-config / compute-resource-policy /
batch-queue-resource-policy nested serializers — to read the gRPC protobuf
directly, replacing the ~600 lines of Thrift-union manipulation with
proto-native
read + write.
- Read flattens the proto `specific_preferences` oneof byte-for-byte as
before:
`allocationProjectNumber` lifted to the top of each compute preference,
the
rendered prefs wrapped as `{'slurm': {...}, 'aws': null}`, the top-level
`reservations` empty (they live inside the SLURM member).
- Write rebuilds the proto GroupResourceProfile (incl. the SLURM/AWS union
with
reservations + SSH configs, and the compute/batch-queue policies)
directly from
the validated data; GroupResourceProfileViewSet's update computes the
removed
prefs/policies from the original vs saved proto and passes protobuf
straight to
the facade.
- This was the LAST thrift_utils-generated serializer: every DRF serializer
is now
proto-native. Removed grpc_requests.group_resource_profile + all its
reverse
helpers + the _proto_enum/_proto_enum_rev machinery; reduced
grpc_adapters to
only the user-storage FileMetadataResponse->dict helpers (its
Thrift-attribute
read adapters + enum bridges are all gone); dropped the now-unused Thrift
imports
across serializers.py / views.py / grpc_requests.py and serializers.py's
thrift_utils import.
- Also FIXES a read regression: PR #202 accidentally removed the
group_resource_profile read adapter the GRP viewset still referenced,
breaking
the GRP read endpoint with a NameError; the proto-native read path
restores it.
Validated byte-for-byte (full SLURM GRP w/ reservations + SSH configs + both
policy types + the union flatten) vs the original adapter+serializer path;
write
round-trips the full SLURM union. manage.py check green; api test failures
unchanged vs origin/main.
---
.../django_airavata/apps/api/grpc_adapters.py | 92 +-
.../django_airavata/apps/api/grpc_requests.py | 190 ----
.../django_airavata/apps/api/serializers.py | 1157 +++++---------------
.../django_airavata/apps/api/views.py | 215 +---
4 files changed, 332 insertions(+), 1322 deletions(-)
diff --git a/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
b/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
index ec7b3746f..817b8dea8 100644
--- a/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
+++ b/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
@@ -1,92 +1,12 @@
-"""Adapters from gRPC protobuf messages to the attribute shape the existing
-DRF serializers read.
+"""Map gRPC ``FileMetadataResponse`` messages to the plain-dict shape the user
+storage serializers read.
-Track D: while ``apps/api`` views are repointed from the Thrift API to the gRPC
-facade (``request.airavata``), the portal keeps its REST contract with the Vue
-frontend unchanged by reusing the existing serializers. Those serializers were
-generated from the Thrift models, so they read Thrift attribute names
-(``projectID``, ``creationTime``, ...). These adapters expose the corresponding
-protobuf fields (``project_id``, ``creation_time``, ...) under those Thrift
names,
-so serializer output is identical by construction. They are removed once the
-serializers are made protobuf-native.
+The user-storage file/directory serializers consume dicts keyed the way the
+legacy ``user_storage.listdir`` produced; these helpers convert the gRPC
+``FileMetadataResponse`` into that dict shape. (All the Thrift-attribute-shape
+read adapters that once lived here are gone — every serializer is
proto-native.)
"""
-from types import SimpleNamespace
-
-from airavata.model.appcatalog.computeresource.ttypes import (
- JobSubmissionProtocol as _ThriftJobSubmissionProtocol,
-)
-from airavata.model.data.movement.ttypes import (
- DataMovementProtocol as _ThriftDataMovementProtocol,
-)
-from airavata.model.user.ttypes import Status as _ThriftStatus
-
-
-def _thrift_enum(pb, field, thrift_enum):
- """Map a protobuf enum field to the Thrift enum value of the SAME name.
-
- proto and Thrift enums frequently assign different integers to the same
- member name (e.g. ``SummaryType.SSH`` is 1 in proto but 0 in Thrift), so
the
- bridge must go by NAME, never by raw integer — otherwise an adapter would
- silently mislabel one enum member as another.
- """
- enum_descriptor = pb.DESCRIPTOR.fields_by_name[field].enum_type
- name = enum_descriptor.values_by_number[getattr(pb, field)].name
- return getattr(thrift_enum, name)
-
-
-def _thrift_enum_prefixed(pb, field, thrift_enum, proto_prefix):
- """Bridge a proto enum field to a Thrift value by name after stripping a
- proto-only prefix.
-
- proto3 namespaces enum members that would otherwise collide in the file
- (``EXPERIMENT_STATE_CREATED``) where the Thrift enum uses the bare name
- (``CREATED``). Members absent from Thrift — notably the zero ``*_UNKNOWN``
- sentinel — map to None (the serializer renders these as nullable ints).
- """
- name = pb.DESCRIPTOR.fields_by_name[field].enum_type.values_by_number[
- getattr(pb, field)].name
- if name.startswith(proto_prefix):
- name = name[len(proto_prefix):]
- return getattr(thrift_enum, name, None)
-
-
-def _thrift_enum_mapped(pb, field, proto_name_to_thrift):
- """Bridge a proto enum field to a Thrift value via an EXPLICIT name map.
-
- Needed when the proto and Thrift enum member NAMES diverge — proto3
prefixes
- members whose bare name would collide in the file (e.g. proto
- ``DATA_MOVEMENT_PROTOCOL_LOCAL`` vs Thrift ``LOCAL``), and some members
exist
- on only one side (e.g. proto-only ``GRID_FTP``). Unmapped members —
including
- the zero ``*_UNKNOWN`` sentinel and proto-only values — return None (the
- serializer fields are nullable for these).
- """
- enum_descriptor = pb.DESCRIPTOR.fields_by_name[field].enum_type
- proto_name = enum_descriptor.values_by_number[getattr(pb, field)].name
- return proto_name_to_thrift.get(proto_name)
-
-
-# proto DataMovementProtocol member name -> Thrift DataMovementProtocol value.
-# Names diverge (proto prefixes LOCAL; proto-only GRID_FTP has no Thrift
value).
-_DATA_MOVEMENT_PROTOCOL = {
- 'DATA_MOVEMENT_PROTOCOL_LOCAL': _ThriftDataMovementProtocol.LOCAL,
- 'SCP': _ThriftDataMovementProtocol.SCP,
- 'SFTP': _ThriftDataMovementProtocol.SFTP,
- 'UNICORE_STORAGE_SERVICE':
_ThriftDataMovementProtocol.UNICORE_STORAGE_SERVICE,
-}
-
-
-# proto JobSubmissionProtocol member name -> Thrift JobSubmissionProtocol
value.
-# Mostly aligned, but proto JSP_CLOUD maps to Thrift CLOUD (name divergence).
-_JOB_SUBMISSION_PROTOCOL = {
- 'LOCAL': _ThriftJobSubmissionProtocol.LOCAL,
- 'SSH': _ThriftJobSubmissionProtocol.SSH,
- 'GLOBUS': _ThriftJobSubmissionProtocol.GLOBUS,
- 'UNICORE': _ThriftJobSubmissionProtocol.UNICORE,
- 'JSP_CLOUD': _ThriftJobSubmissionProtocol.CLOUD,
- 'SSH_FORK': _ThriftJobSubmissionProtocol.SSH_FORK,
- 'LOCAL_FORK': _ThriftJobSubmissionProtocol.LOCAL_FORK,
-}
# --- User storage file/directory listings -----------------------------------
# The storage serializers (UserStorageFileSerializer /
UserStorageDirectorySerializer)
diff --git a/airavata-django-portal/django_airavata/apps/api/grpc_requests.py
b/airavata-django-portal/django_airavata/apps/api/grpc_requests.py
index 0411a667f..6165b2286 100644
--- a/airavata-django-portal/django_airavata/apps/api/grpc_requests.py
+++ b/airavata-django-portal/django_airavata/apps/api/grpc_requests.py
@@ -14,16 +14,6 @@ proto3 scalar fields cannot hold ``None``, so optional
Thrift values that may be
import importlib
-from airavata.model.appcatalog.computeresource.ttypes import (
- JobSubmissionProtocol as _ThriftJobSubmissionProtocol,
-)
-from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- ResourceType as _ThriftResourceType,
-)
-from airavata.model.data.movement.ttypes import (
- DataMovementProtocol as _ThriftDataMovementProtocol,
-)
-
_GEN = "airavata_sdk.generated.org.apache.airavata.model"
@@ -31,28 +21,6 @@ def _pb2(path):
return importlib.import_module(f"{_GEN}.{path}")
-def _proto_enum(proto_enum, thrift_enum, value, prefix=''):
- """Thrift enum value -> proto enum value, by name (mirror of the read-side
- ``grpc_adapters`` enum bridges).
-
- proto and Thrift enums assign different integers to the same member name,
so
- the bridge goes by NAME. ``prefix`` re-applies a proto-only member prefix
- (e.g. ``EXPERIMENT_STATE_``). ``None`` -> 0 (the proto default / zero
- sentinel).
- """
- if value is None:
- return 0
- name = thrift_enum(value).name
- # proto3 prefixes only members that would otherwise collide in the file —
- # in practice just the zero *_UNKNOWN sentinel — so real members usually
- # stay bare. Re-apply the prefix only when it yields a valid proto member
- # (the mirror of the read-side _thrift_enum_prefixed, which strips it only
- # when present).
- if prefix and (prefix + name) in proto_enum.keys():
- name = prefix + name
- return proto_enum.Value(name)
-
-
def password_credential(gateway_id, portal_user_name, login_user_name,
password, description):
"""Build a proto ``PasswordCredential`` from the create-password
request."""
@@ -65,164 +33,6 @@ def password_credential(gateway_id, portal_user_name,
login_user_name,
)
-def _proto_enum_rev(proto_enum, rev_map, value):
- """Thrift enum value -> proto enum value via an EXPLICIT inverse name map
- (for protocol enums whose proto/Thrift member names diverge). None/unmapped
- -> 0 (proto *_UNKNOWN)."""
- if value is None:
- return 0
- name = rev_map.get(value)
- return proto_enum.Value(name) if name is not None else 0
-
-
-# Thrift protocol value -> proto member name, preserving the divergent name
pairs
-# (Thrift CLOUD <-> proto JSP_CLOUD; Thrift LOCAL <-> proto
-# DATA_MOVEMENT_PROTOCOL_LOCAL). Used by the group-resource-profile write path.
-_JOB_SUBMISSION_PROTOCOL_REV = {
- _ThriftJobSubmissionProtocol.LOCAL: 'LOCAL',
- _ThriftJobSubmissionProtocol.SSH: 'SSH',
- _ThriftJobSubmissionProtocol.GLOBUS: 'GLOBUS',
- _ThriftJobSubmissionProtocol.UNICORE: 'UNICORE',
- _ThriftJobSubmissionProtocol.CLOUD: 'JSP_CLOUD',
- _ThriftJobSubmissionProtocol.SSH_FORK: 'SSH_FORK',
- _ThriftJobSubmissionProtocol.LOCAL_FORK: 'LOCAL_FORK',
-}
-_DATA_MOVEMENT_PROTOCOL_REV = {
- _ThriftDataMovementProtocol.LOCAL: 'DATA_MOVEMENT_PROTOCOL_LOCAL',
- _ThriftDataMovementProtocol.SCP: 'SCP',
- _ThriftDataMovementProtocol.SFTP: 'SFTP',
- _ThriftDataMovementProtocol.UNICORE_STORAGE_SERVICE:
'UNICORE_STORAGE_SERVICE',
-}
-
-
-# --- Group resource profile (write direction) ------------------------------
-# Reverse of grpc_adapters.group_resource_profile. Reuses the _proto_enum_rev
-# protocol maps defined above.
-
-
-def _grp_pb2():
- return _pb2("appcatalog.groupresourceprofile.group_resource_profile_pb2")
-
-
-def _reverse_compute_resource_reservation(t):
- return _grp_pb2().ComputeResourceReservation(
- reservation_id=t.reservationId or '',
- reservation_name=t.reservationName or '',
- queue_names=list(t.queueNames or []),
- start_time=t.startTime or 0,
- end_time=t.endTime or 0,
- )
-
-
-def _reverse_group_account_ssh_provisioner_config(t):
- return _grp_pb2().GroupAccountSSHProvisionerConfig(
- resource_id=t.resourceId or '',
- group_resource_profile_id=t.groupResourceProfileId or '',
- config_name=t.configName or '',
- config_value=t.configValue or '',
- )
-
-
-def _reverse_slurm_compute_resource_preference(t):
- return _grp_pb2().SlurmComputeResourcePreference(
- allocation_project_number=t.allocationProjectNumber or '',
- preferred_batch_queue=t.preferredBatchQueue or '',
- quality_of_service=t.qualityOfService or '',
- usage_reporting_gateway_id=t.usageReportingGatewayId or '',
- ssh_account_provisioner=t.sshAccountProvisioner or '',
- group_ssh_account_provisioner_configs=[
- _reverse_group_account_ssh_provisioner_config(c)
- for c in (t.groupSSHAccountProvisionerConfigs or [])],
- ssh_account_provisioner_additional_info=(
- t.sshAccountProvisionerAdditionalInfo or ''),
- reservations=[
- _reverse_compute_resource_reservation(r)
- for r in (t.reservations or [])],
- )
-
-
-def _reverse_aws_compute_resource_preference(t):
- return _grp_pb2().AwsComputeResourcePreference(
- region=t.region or '',
- preferred_ami_id=t.preferredAmiId or '',
- preferred_instance_type=t.preferredInstanceType or '',
- )
-
-
-def _reverse_group_compute_resource_preference(t):
- grp = _grp_pb2()
- cr = _pb2("appcatalog.computeresource.compute_resource_pb2")
- dm = _pb2("data.movement.data_movement_pb2")
- msg = grp.GroupComputeResourcePreference(
- compute_resource_id=t.computeResourceId or '',
- group_resource_profile_id=t.groupResourceProfileId or '',
- override_by_airavata=bool(t.overridebyAiravata),
- login_user_name=t.loginUserName or '',
- scratch_location=t.scratchLocation or '',
- preferred_job_submission_protocol=_proto_enum_rev(
- cr.JobSubmissionProtocol, _JOB_SUBMISSION_PROTOCOL_REV,
- t.preferredJobSubmissionProtocol),
- preferred_data_movement_protocol=_proto_enum_rev(
- dm.DataMovementProtocol, _DATA_MOVEMENT_PROTOCOL_REV,
- t.preferredDataMovementProtocol),
- resource_specific_credential_store_token=(
- t.resourceSpecificCredentialStoreToken or ''),
- resource_type=_proto_enum(
- grp.ResourceType, _ThriftResourceType, t.resourceType),
- )
- sp = getattr(t, 'specificPreferences', None)
- if sp is not None:
- if t.resourceType == _ThriftResourceType.AWS and getattr(sp, 'aws',
None):
-
msg.specific_preferences.CopyFrom(grp.EnvironmentSpecificPreferences(
- aws=_reverse_aws_compute_resource_preference(sp.aws)))
- elif getattr(sp, 'slurm', None):
-
msg.specific_preferences.CopyFrom(grp.EnvironmentSpecificPreferences(
- slurm=_reverse_slurm_compute_resource_preference(sp.slurm)))
- return msg
-
-
-def _reverse_compute_resource_policy(t):
- return _grp_pb2().ComputeResourcePolicy(
- resource_policy_id=t.resourcePolicyId or '',
- compute_resource_id=t.computeResourceId or '',
- group_resource_profile_id=t.groupResourceProfileId or '',
- allowed_batch_queues=list(t.allowedBatchQueues or []),
- )
-
-
-def _reverse_batch_queue_resource_policy(t):
- return _grp_pb2().BatchQueueResourcePolicy(
- resource_policy_id=t.resourcePolicyId or '',
- compute_resource_id=t.computeResourceId or '',
- group_resource_profile_id=t.groupResourceProfileId or '',
- queuename=t.queuename or '',
- max_allowed_nodes=t.maxAllowedNodes or 0,
- max_allowed_cores=t.maxAllowedCores or 0,
- max_allowed_walltime=t.maxAllowedWalltime or 0,
- )
-
-
-def group_resource_profile(t):
- """Thrift ``GroupResourceProfile`` -> proto message."""
- return _grp_pb2().GroupResourceProfile(
- gateway_id=t.gatewayId or '',
- group_resource_profile_id=t.groupResourceProfileId or '',
- group_resource_profile_name=t.groupResourceProfileName or '',
- compute_preferences=[
- _reverse_group_compute_resource_preference(p)
- for p in (t.computePreferences or [])],
- compute_resource_policies=[
- _reverse_compute_resource_policy(p)
- for p in (t.computeResourcePolicies or [])],
- batch_queue_resource_policies=[
- _reverse_batch_queue_resource_policy(p)
- for p in (t.batchQueueResourcePolicies or [])],
- creation_time=t.creationTime or 0,
- updated_time=t.updatedTime or 0,
- default_credential_store_token=t.defaultCredentialStoreToken or '',
- )
-
-
def data_product_for_upload(*, gateway_id, owner_name, product_name, file_path,
storage_resource_id, content_type=None,
product_size=0):
"""Build a proto ``DataProductModel`` to register for a freshly uploaded
file.
diff --git a/airavata-django-portal/django_airavata/apps/api/serializers.py
b/airavata-django-portal/django_airavata/apps/api/serializers.py
index f609d1308..8a2054825 100644
--- a/airavata-django-portal/django_airavata/apps/api/serializers.py
+++ b/airavata-django-portal/django_airavata/apps/api/serializers.py
@@ -1,24 +1,11 @@
-import copy
import datetime
import json
import logging
import os
from urllib.parse import quote
-from airavata.model.application.io.ttypes import DataType
-
-from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- ComputeResourceReservation,
- GroupComputeResourcePreference,
- GroupResourceProfile,
- ResourceType,
- SlurmComputeResourcePreference,
- AwsComputeResourcePreference
-)
+
from airavata.model.appcatalog.parser.ttypes import IOType as _ThriftIOType
from airavata.model.group.ttypes import ResourcePermissionType
-from airavata.model.status.ttypes import (
- ExperimentState
-)
from airavata_django_portal_sdk import (
experiment_util
)
@@ -27,7 +14,7 @@ from django.contrib.auth import get_user_model
from django.urls import reverse
from rest_framework import serializers
-from . import models, thrift_utils, view_utils
+from . import models, view_utils
log = logging.getLogger(__name__)
@@ -2011,909 +1998,353 @@ class UserProfileSerializer(serializers.Serializer):
return None
-class ComputeResourceReservationSerializer(
- thrift_utils.create_serializer_class(ComputeResourceReservation)):
- startTime = UTCPosixTimestampDateTimeField(allow_null=True)
- endTime = UTCPosixTimestampDateTimeField(allow_null=True)
-
-
-class GroupComputeResourcePreferenceSerializer(
- thrift_utils.create_serializer_class(GroupComputeResourcePreference)):
- reservations = serializers.SerializerMethodField()
-
- # Check if the object (e.g. SLURM type) has the 'reservations' attribute
- def get_reservations(self, obj):
- if hasattr(obj, 'reservations'):
- reservations_data = getattr(obj, 'reservations')
- if reservations_data is not None:
- return ComputeResourceReservationSerializer(reservations_data,
many=True, context=self.context).data
-
- return []
-
- @staticmethod
- def _convert_nested_list_fields_to_thrift(slurm_pref):
- from collections import OrderedDict
- from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- ComputeResourceReservation,
- GroupAccountSSHProvisionerConfig
- )
-
- if hasattr(slurm_pref, 'reservations') and slurm_pref.reservations:
- if isinstance(slurm_pref.reservations, list):
- converted_reservations = []
- for res in slurm_pref.reservations:
- if isinstance(res, (dict, OrderedDict)):
-
converted_reservations.append(ComputeResourceReservation(**res))
- else:
- converted_reservations.append(res)
- slurm_pref.reservations = converted_reservations
-
- if hasattr(slurm_pref, 'groupSSHAccountProvisionerConfigs') and
slurm_pref.groupSSHAccountProvisionerConfigs:
- if isinstance(slurm_pref.groupSSHAccountProvisionerConfigs, list):
- converted_configs = []
- for cfg in slurm_pref.groupSSHAccountProvisionerConfigs:
- if isinstance(cfg, (dict, OrderedDict)):
-
converted_configs.append(GroupAccountSSHProvisionerConfig(**cfg))
- else:
- converted_configs.append(cfg)
- slurm_pref.groupSSHAccountProvisionerConfigs =
converted_configs
+def _grp_pb2():
+ from
airavata_sdk.generated.org.apache.airavata.model.appcatalog.groupresourceprofile
import ( # noqa: E501
+ group_resource_profile_pb2,
+ )
+ return group_resource_profile_pb2
- @staticmethod
- def _convert_specific_preferences_dict_to_thrift(pref_instance,
resource_type):
- from collections import OrderedDict
- if not hasattr(pref_instance, 'specificPreferences'):
- return
+def _resource_type_field(**kwargs):
+ from airavata.model.appcatalog.groupresourceprofile.ttypes import (
+ ResourceType as _T,
+ )
+ return proto_enum_int_field(
+ _grp_pb2().ResourceType.DESCRIPTOR, _T, proto_prefix='RESOURCE_TYPE_',
+ **kwargs)
- if isinstance(pref_instance.specificPreferences, (dict, OrderedDict)):
- specific_prefs_dict = pref_instance.specificPreferences
- union_type_class = None
- try:
- from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
- EnvironmentSpecificPreferences
- )
- union_type_class = EnvironmentSpecificPreferences
- log.debug(
- "GCPreference: Got union type class from import: %s",
- union_type_class.__name__,
- )
- except ImportError as e:
- log.error(
- "GCPreference: Failed to import
EnvironmentSpecificPreferences: %s",
- str(e),
- exc_info=True,
- )
+class ComputeResourceReservationSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``ComputeResourceReservation``."""
- if union_type_class:
- pref_instance.specificPreferences = union_type_class()
-
- if resource_type == ResourceType.SLURM:
- if 'slurm' in specific_prefs_dict:
- slurm_data = specific_prefs_dict['slurm']
- else:
- slurm_data = specific_prefs_dict
-
- if slurm_data and isinstance(slurm_data, dict) and
len(slurm_data) > 0:
- try:
- from collections import OrderedDict
- from
airavata.model.appcatalog.groupresourceprofile.ttypes import (
- ComputeResourceReservation,
- GroupAccountSSHProvisionerConfig
- )
-
- if 'reservations' in slurm_data and
slurm_data['reservations']:
- reservations_list = slurm_data['reservations']
- if isinstance(reservations_list, list):
- converted_reservations = []
- for res in reservations_list:
- if isinstance(res, (dict,
OrderedDict)):
-
converted_reservations.append(ComputeResourceReservation(**res))
- else:
- converted_reservations.append(res)
- slurm_data['reservations'] =
converted_reservations
-
- if 'groupSSHAccountProvisionerConfigs' in
slurm_data and slurm_data['groupSSHAccountProvisionerConfigs']:
- configs_list =
slurm_data['groupSSHAccountProvisionerConfigs']
- if isinstance(configs_list, list):
- converted_configs = []
- for cfg in configs_list:
- if isinstance(cfg, (dict,
OrderedDict)):
-
converted_configs.append(GroupAccountSSHProvisionerConfig(**cfg))
- else:
- converted_configs.append(cfg)
-
slurm_data['groupSSHAccountProvisionerConfigs'] = converted_configs
-
- slurm_pref =
SlurmComputeResourcePreference(**slurm_data)
- pref_instance.specificPreferences.slurm =
slurm_pref
-
GroupComputeResourcePreferenceSerializer._convert_nested_list_fields_to_thrift(slurm_pref)
- log.info(
- "GCPreference: Converted specificPreferences
dict to SLURM Thrift union type, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- except Exception as e:
- log.error(
- "GCPreference: Failed to create
SlurmComputeResourcePreference from dict: %s, computeResourceId=%s",
- str(e),
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- exc_info=True,
- )
- else:
- log.info(
- "GCPreference: specificPreferences dict is empty,
created empty union type, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- elif resource_type == ResourceType.AWS:
- if 'aws' in specific_prefs_dict:
- aws_data = specific_prefs_dict['aws']
- else:
- aws_data = specific_prefs_dict
-
- if aws_data and isinstance(aws_data, dict) and
len(aws_data) > 0:
- try:
- aws_pref = AwsComputeResourcePreference(**aws_data)
- pref_instance.specificPreferences.aws = aws_pref
- log.info(
- "GCPreference: Converted specificPreferences
dict to AWS Thrift union type, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- except Exception as e:
- log.error(
- "GCPreference: Failed to create
AwsComputeResourcePreference from dict: %s, computeResourceId=%s",
- str(e),
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- exc_info=True,
- )
- else:
- log.info(
- "GCPreference: specificPreferences dict is empty,
created empty union type, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- else:
- log.error(
- "GCPreference: Could not get union type class to convert
specificPreferences dict, computeResourceId=%s",
- pref_instance.computeResourceId if hasattr(pref_instance,
'computeResourceId') else 'unknown',
- )
- union_type_created = False
- try:
- test_instance =
GroupComputeResourcePreference(resourceType=resource_type)
- if test_instance.specificPreferences is not None:
- pref_instance.specificPreferences =
type(test_instance.specificPreferences)()
- union_type_created = True
- log.info(
- "GCPreference: Created empty union type from test
instance, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- except Exception as e:
- log.warning(
- "GCPreference: Failed to create union type from test
instance: %s, trying direct construction",
- str(e),
- )
-
- if not union_type_created:
- if hasattr(pref_instance, 'resourceType') and
pref_instance.resourceType:
- try:
- temp =
GroupComputeResourcePreference(resourceType=pref_instance.resourceType)
- if temp.specificPreferences is not None:
- pref_instance.specificPreferences =
type(temp.specificPreferences)()
- union_type_created = True
- log.info(
- "GCPreference: Created empty union type
using pref_instance.resourceType, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- except Exception as e2:
- log.error(
- "GCPreference: All attempts to create union
type failed: %s, computeResourceId=%s",
- str(e2),
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- exc_info=True,
- )
-
- if not union_type_created:
- log.error(
- "GCPreference: Could not create union type at all!
specificPreferences will remain as dict, computeResourceId=%s",
- pref_instance.computeResourceId if
hasattr(pref_instance, 'computeResourceId') else 'unknown',
- )
- else:
- if hasattr(pref_instance.specificPreferences, 'slurm') and
pref_instance.specificPreferences.slurm:
-
GroupComputeResourcePreferenceSerializer._convert_nested_list_fields_to_thrift(
- pref_instance.specificPreferences.slurm
- )
+ reservationId = serializers.CharField(source='reservation_id',
allow_blank=True, allow_null=True, required=False)
+ reservationName = serializers.CharField(source='reservation_name',
allow_blank=True, allow_null=True, required=False)
+ queueNames = serializers.ListField(source='queue_names',
child=serializers.CharField(), required=False)
+ startTime = ProtoIntOrNoneField(source='start_time')
+ endTime = ProtoIntOrNoneField(source='end_time')
- def to_representation(self, instance):
- """
- Override to extract fields from specificPreferences union type.
- """
- ret = super().to_representation(instance)
+ def create(self, validated_data):
+ d = validated_data
+ return _grp_pb2().ComputeResourceReservation(
+ reservation_id=d.get('reservation_id', '') or '',
+ reservation_name=d.get('reservation_name', '') or '',
+ queue_names=list(d.get('queue_names', []) or []),
+ start_time=d.get('start_time', 0) or 0,
+ end_time=d.get('end_time', 0) or 0,
+ )
- if hasattr(instance, 'specificPreferences') and
instance.specificPreferences:
- if hasattr(instance.specificPreferences, 'slurm') and
instance.specificPreferences.slurm:
- slurm_pref = instance.specificPreferences.slurm
- if hasattr(slurm_pref, 'allocationProjectNumber'):
- ret['allocationProjectNumber'] =
slurm_pref.allocationProjectNumber
- if 'specificPreferences' in ret and
isinstance(ret['specificPreferences'], dict):
- if 'slurm' not in ret['specificPreferences']:
- ret['specificPreferences'] = {'slurm':
ret['specificPreferences']}
- elif hasattr(instance.specificPreferences, 'aws') and
instance.specificPreferences.aws:
- aws_pref = instance.specificPreferences.aws
- aws_fields = {
- 'region': getattr(aws_pref, 'region', None),
- 'preferredAmiId': getattr(aws_pref, 'preferredAmiId',
None),
- 'preferredInstanceType': getattr(aws_pref,
'preferredInstanceType', None),
- }
- ret['specificPreferences'] = aws_fields
- return ret
+class _GroupAccountSSHProvisionerConfigSerializer(serializers.Serializer):
+ resourceId = serializers.CharField(source='resource_id', allow_blank=True,
allow_null=True, required=False)
+ groupResourceProfileId =
serializers.CharField(source='group_resource_profile_id', allow_blank=True,
allow_null=True, required=False)
+ configName = serializers.CharField(source='config_name', allow_blank=True,
allow_null=True, required=False)
+ configValue = serializers.CharField(source='config_value',
allow_blank=True, allow_null=True, required=False)
def create(self, validated_data):
- """
- Override create() to properly handle resourceType and
specificPreferences union type.
- """
- if isinstance(validated_data, GroupComputeResourcePreference):
- resource_type = None
- if not hasattr(validated_data, 'resourceType') or
validated_data.resourceType is None:
- from collections import OrderedDict
- if hasattr(validated_data, 'specificPreferences') and
validated_data.specificPreferences:
- if isinstance(validated_data.specificPreferences, (dict,
OrderedDict)):
- specific_prefs_dict =
validated_data.specificPreferences
- if 'slurm' in specific_prefs_dict or
'allocationProjectNumber' in specific_prefs_dict:
- resource_type = ResourceType.SLURM
- elif 'aws' in specific_prefs_dict or 'region' in
specific_prefs_dict:
- resource_type = ResourceType.AWS
- else:
- resource_type = ResourceType.SLURM
- elif hasattr(validated_data.specificPreferences, 'slurm')
and validated_data.specificPreferences.slurm:
- resource_type = ResourceType.SLURM
- elif hasattr(validated_data.specificPreferences, 'aws')
and validated_data.specificPreferences.aws:
- resource_type = ResourceType.AWS
- else:
- resource_type = ResourceType.SLURM
- else:
- resource_type = ResourceType.SLURM
-
- if resource_type:
- validated_data.resourceType = resource_type
- else:
- resource_type = validated_data.resourceType
+ d = validated_data
+ return _grp_pb2().GroupAccountSSHProvisionerConfig(
+ resource_id=d.get('resource_id', '') or '',
+ group_resource_profile_id=d.get('group_resource_profile_id', '')
or '',
+ config_name=d.get('config_name', '') or '',
+ config_value=d.get('config_value', '') or '',
+ )
- if resource_type:
-
self._convert_specific_preferences_dict_to_thrift(validated_data, resource_type)
- return validated_data
+class GroupComputeResourcePreferenceSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``GroupComputeResourcePreference``.
- data = copy.deepcopy(validated_data)
+ The proto ``specific_preferences`` is a oneof of a SLURM or AWS
sub-message;
+ the read output flattens ``allocationProjectNumber`` to the top level and
+ wraps the rendered preferences as ``{'slurm': {...}}`` / AWS fields,
matching
+ the historical Thrift-union serializer byte-for-byte.
+ """
- resource_type = data.get('resourceType')
- if resource_type is None:
- if 'allocationProjectNumber' in data or ('specificPreferences' in
data and isinstance(data.get('specificPreferences'), dict) and 'slurm' in
data.get('specificPreferences', {})):
- resource_type = ResourceType.SLURM
- elif 'specificPreferences' in data and
isinstance(data.get('specificPreferences'), dict) and 'aws' in
data.get('specificPreferences', {}):
- resource_type = ResourceType.AWS
- elif 'region' in data or 'preferredAmiId' in data or
'preferredInstanceType' in data:
- resource_type = ResourceType.AWS
- else:
- resource_type = ResourceType.SLURM
+ computeResourceId = serializers.CharField(source='compute_resource_id',
allow_blank=True, allow_null=True, required=False)
+ groupResourceProfileId =
serializers.CharField(source='group_resource_profile_id', allow_blank=True,
allow_null=True, required=False)
+ overridebyAiravata =
serializers.BooleanField(source='override_by_airavata', required=False,
default=False)
+ loginUserName = serializers.CharField(source='login_user_name',
allow_blank=True, allow_null=True, required=False)
+ scratchLocation = serializers.CharField(source='scratch_location',
allow_blank=True, allow_null=True, required=False)
+ preferredJobSubmissionProtocol =
job_submission_protocol_field(source='preferred_job_submission_protocol',
required=False, allow_null=True)
+ preferredDataMovementProtocol =
data_movement_protocol_field(source='preferred_data_movement_protocol',
required=False, allow_null=True)
+ resourceSpecificCredentialStoreToken =
serializers.CharField(source='resource_specific_credential_store_token',
allow_blank=True, allow_null=True, required=False)
+ resourceType = _resource_type_field(source='resource_type',
required=False, allow_null=True)
+ specificPreferences = serializers.SerializerMethodField()
+ reservations = serializers.SerializerMethodField()
- if isinstance(resource_type, str):
- try:
- resource_type = ResourceType[resource_type]
- except (KeyError, AttributeError):
- resource_type = ResourceType.SLURM
- elif isinstance(resource_type, int):
- try:
- resource_type = ResourceType(resource_type)
- except (ValueError, AttributeError):
- resource_type = ResourceType.SLURM
+ def get_specificPreferences(self, pref):
+ if not pref.HasField('specific_preferences'):
+ return None
+ sp = pref.specific_preferences
+ which = sp.WhichOneof('preferences')
+ if which == 'slurm':
+ # The old union serializer always rendered both members (the
inactive
+ # one as null); preserve that.
+ return {'slurm': self._slurm(sp.slurm), 'aws': None}
+ if which == 'aws':
+ return self._aws(sp.aws)
+ return None
- data['resourceType'] = resource_type
+ def get_reservations(self, pref):
+ # The top-level reservations field is always empty here: reservations
live
+ # inside the SLURM union member (matching the old serializer's
behaviour
+ # where the compute-pref object had no top-level ``reservations``
attr).
+ return []
- specific_prefs = data.pop('specificPreferences', None)
+ def to_representation(self, instance):
+ ret = super().to_representation(instance)
+ # Flatten allocationProjectNumber out of the SLURM union member, as the
+ # old union serializer did.
+ if (instance.HasField('specific_preferences') and
+ instance.specific_preferences.WhichOneof('preferences') ==
'slurm'):
+ ret['allocationProjectNumber'] = (
+ instance.specific_preferences.slurm.allocation_project_number)
+ return ret
- slurm_data = {}
- aws_data = {}
+ def to_internal_value(self, data):
+ # specificPreferences / allocationProjectNumber / reservations are
+ # read-flatten outputs (SerializerMethodField); carry the raw write
input
+ # through so create() can rebuild the proto union.
+ ret = super().to_internal_value(data)
+ if isinstance(data, dict):
+ for k in ('specificPreferences', 'allocationProjectNumber',
+ 'reservations'):
+ if k in data:
+ ret[k] = data[k]
+ return ret
- slurm_fields = ['allocationProjectNumber', 'preferredBatchQueue',
'qualityOfService',
- 'usageReportingGatewayId', 'sshAccountProvisioner',
- 'groupSSHAccountProvisionerConfigs',
'sshAccountProvisionerAdditionalInfo',
- 'reservations']
- aws_fields = ['region', 'preferredAmiId', 'preferredInstanceType']
+ @staticmethod
+ def _slurm(s):
+ return {
+ 'allocationProjectNumber': s.allocation_project_number,
+ 'preferredBatchQueue': s.preferred_batch_queue,
+ 'qualityOfService': s.quality_of_service,
+ 'usageReportingGatewayId': s.usage_reporting_gateway_id,
+ 'sshAccountProvisioner': s.ssh_account_provisioner,
+ 'groupSSHAccountProvisionerConfigs': [
+ {
+ 'resourceId': c.resource_id,
+ 'groupResourceProfileId': c.group_resource_profile_id,
+ 'configName': c.config_name,
+ 'configValue': c.config_value,
+ }
+ for c in s.group_ssh_account_provisioner_configs],
+ 'sshAccountProvisionerAdditionalInfo':
s.ssh_account_provisioner_additional_info,
+ 'reservations': [
+ {
+ 'reservationId': r.reservation_id,
+ 'reservationName': r.reservation_name,
+ 'queueNames': list(r.queue_names),
+ 'startTime': r.start_time or None,
+ 'endTime': r.end_time or None,
+ }
+ for r in s.reservations],
+ }
- for field in slurm_fields:
- if field in data:
- slurm_data[field] = data.pop(field)
- for field in aws_fields:
- if field in data:
- aws_data[field] = data.pop(field)
+ @staticmethod
+ def _aws(a):
+ return {
+ 'region': a.region,
+ 'preferredAmiId': a.preferred_ami_id,
+ 'preferredInstanceType': a.preferred_instance_type,
+ }
- thrift_spec = GroupComputeResourcePreference.thrift_spec
- for field_spec in thrift_spec:
- if field_spec:
- field_name = field_spec[2]
- default_value = field_spec[4]
- if default_value is not None:
- if field_name in data and data[field_name] is None:
- del data[field_name]
+ def create(self, validated_data):
+ return _build_group_compute_resource_preference(validated_data)
- instance = GroupComputeResourcePreference(**data)
- instance.resourceType = resource_type
+def _build_group_compute_resource_preference(d):
+ """Build a proto ``GroupComputeResourcePreference`` from validated write
data.
- union_type_class = None
- try:
- from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- EnvironmentSpecificPreferences
+ The frontend sends ``specificPreferences`` (a ``{'slurm': {...}}`` / AWS
+ dict) plus a flattened ``allocationProjectNumber`` and a ``resourceType``;
+ map them into the proto oneof.
+ """
+ grp = _grp_pb2()
+ sp = d.get('specificPreferences')
+ resource_type = d.get('resource_type', 0) or 0
+ msg = grp.GroupComputeResourcePreference(
+ compute_resource_id=d.get('compute_resource_id', '') or '',
+ group_resource_profile_id=d.get('group_resource_profile_id', '') or '',
+ override_by_airavata=bool(d.get('override_by_airavata', False)),
+ login_user_name=d.get('login_user_name', '') or '',
+ scratch_location=d.get('scratch_location', '') or '',
+
preferred_job_submission_protocol=d.get('preferred_job_submission_protocol', 0)
or 0,
+
preferred_data_movement_protocol=d.get('preferred_data_movement_protocol', 0)
or 0,
+
resource_specific_credential_store_token=d.get('resource_specific_credential_store_token',
'') or '',
+ resource_type=resource_type,
+ )
+ # Resolve the union: prefer an explicit specificPreferences, else infer
from
+ # the flattened allocationProjectNumber / aws fields.
+ slurm_data = None
+ aws_data = None
+ if isinstance(sp, dict):
+ if 'slurm' in sp and isinstance(sp['slurm'], dict):
+ slurm_data = dict(sp['slurm'])
+ elif 'aws' in sp and isinstance(sp['aws'], dict):
+ aws_data = dict(sp['aws'])
+ elif 'region' in sp or 'preferredAmiId' in sp:
+ aws_data = dict(sp)
+ elif sp:
+ slurm_data = dict(sp)
+ if slurm_data is None and aws_data is None:
+ # Infer from the flattened fields the read output produced.
+ proto_resource_type = grp.ResourceType
+ if resource_type == proto_resource_type.AWS or 'region' in d:
+ aws_data = {}
+ elif 'allocationProjectNumber' in d or resource_type ==
proto_resource_type.SLURM:
+ slurm_data = {}
+ if 'allocationProjectNumber' in d and slurm_data is not None:
+ slurm_data.setdefault('allocationProjectNumber',
d['allocationProjectNumber'])
+ if slurm_data is not None:
+ msg.specific_preferences.CopyFrom(grp.EnvironmentSpecificPreferences(
+ slurm=_build_slurm_pref(slurm_data)))
+ elif aws_data is not None:
+ msg.specific_preferences.CopyFrom(grp.EnvironmentSpecificPreferences(
+ aws=_build_aws_pref(aws_data)))
+ return msg
+
+
+def _build_slurm_pref(s):
+ grp = _grp_pb2()
+ return grp.SlurmComputeResourcePreference(
+ allocation_project_number=s.get('allocationProjectNumber', '') or '',
+ preferred_batch_queue=s.get('preferredBatchQueue', '') or '',
+ quality_of_service=s.get('qualityOfService', '') or '',
+ usage_reporting_gateway_id=s.get('usageReportingGatewayId', '') or '',
+ ssh_account_provisioner=s.get('sshAccountProvisioner', '') or '',
+ group_ssh_account_provisioner_configs=[
+ grp.GroupAccountSSHProvisionerConfig(
+ resource_id=c.get('resourceId', '') or '',
+ group_resource_profile_id=c.get('groupResourceProfileId', '')
or '',
+ config_name=c.get('configName', '') or '',
+ config_value=c.get('configValue', '') or '',
)
- union_type_class = EnvironmentSpecificPreferences
- except ImportError as e:
- log.error(
- "GCPreference create: Failed to import
EnvironmentSpecificPreferences: %s",
- str(e),
- exc_info=True,
+ for c in (s.get('groupSSHAccountProvisionerConfigs', []) or [])],
+
ssh_account_provisioner_additional_info=s.get('sshAccountProvisionerAdditionalInfo',
'') or '',
+ reservations=[
+ grp.ComputeResourceReservation(
+ reservation_id=r.get('reservationId', '') or '',
+ reservation_name=r.get('reservationName', '') or '',
+ queue_names=list(r.get('queueNames', []) or []),
+ start_time=r.get('startTime', 0) or 0,
+ end_time=r.get('endTime', 0) or 0,
)
+ for r in (s.get('reservations', []) or [])],
+ )
- if specific_prefs is None:
- if resource_type == ResourceType.SLURM and slurm_data:
- from collections import OrderedDict
- from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
- GroupAccountSSHProvisionerConfig
- )
- if 'reservations' in slurm_data and slurm_data['reservations']:
- reservations_list = slurm_data['reservations']
- if isinstance(reservations_list, list):
- converted_reservations = []
- for res in reservations_list:
- if isinstance(res, (dict, OrderedDict)):
-
converted_reservations.append(ComputeResourceReservation(**res))
- else:
- converted_reservations.append(res)
- slurm_data['reservations'] = converted_reservations
-
- if 'groupSSHAccountProvisionerConfigs' in slurm_data and
slurm_data['groupSSHAccountProvisionerConfigs']:
- configs_list =
slurm_data['groupSSHAccountProvisionerConfigs']
- if isinstance(configs_list, list):
- converted_configs = []
- for cfg in configs_list:
- if isinstance(cfg, (dict, OrderedDict)):
-
converted_configs.append(GroupAccountSSHProvisionerConfig(**cfg))
- else:
- converted_configs.append(cfg)
- slurm_data['groupSSHAccountProvisionerConfigs'] =
converted_configs
-
- slurm_pref = SlurmComputeResourcePreference(**slurm_data)
- if union_type_class:
- instance.specificPreferences = union_type_class()
- instance.specificPreferences.slurm = slurm_pref
- else:
- try:
- test_instance =
GroupComputeResourcePreference(resourceType=resource_type)
- if test_instance.specificPreferences is not None:
- instance.specificPreferences =
type(test_instance.specificPreferences)()
- instance.specificPreferences.slurm = slurm_pref
- else:
- log.warning(
- "GCPreference create: Could not create union
type, instance may be invalid"
- )
- except Exception as e:
- log.error(
- "GCPreference create: Failed to set
specificPreferences.slurm: %s",
- str(e),
- exc_info=True,
- )
- elif resource_type == ResourceType.AWS and aws_data:
- aws_pref = AwsComputeResourcePreference(**aws_data)
- if union_type_class:
- instance.specificPreferences = union_type_class()
- instance.specificPreferences.aws = aws_pref
- else:
- try:
- test_instance =
GroupComputeResourcePreference(resourceType=resource_type)
- if test_instance.specificPreferences is not None:
- instance.specificPreferences =
type(test_instance.specificPreferences)()
- instance.specificPreferences.aws = aws_pref
- except Exception as e:
- log.error(
- "GCPreference create: Failed to set
specificPreferences.aws: %s",
- str(e),
- exc_info=True,
- )
- elif isinstance(specific_prefs, dict):
- if 'slurm' in specific_prefs:
- slurm_dict = specific_prefs['slurm'].copy() if
isinstance(specific_prefs['slurm'], dict) else {}
- slurm_dict.update(slurm_data)
- from collections import OrderedDict
- from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
- GroupAccountSSHProvisionerConfig
- )
+def _build_aws_pref(a):
+ return _grp_pb2().AwsComputeResourcePreference(
+ region=a.get('region', '') or '',
+ preferred_ami_id=a.get('preferredAmiId', '') or '',
+ preferred_instance_type=a.get('preferredInstanceType', '') or '',
+ )
- if 'reservations' in slurm_dict and slurm_dict['reservations']:
- reservations_list = slurm_dict['reservations']
- if isinstance(reservations_list, list):
- converted_reservations = []
- for res in reservations_list:
- if isinstance(res, (dict, OrderedDict)):
-
converted_reservations.append(ComputeResourceReservation(**res))
- else:
- converted_reservations.append(res)
- slurm_dict['reservations'] = converted_reservations
-
- if 'groupSSHAccountProvisionerConfigs' in slurm_dict and
slurm_dict['groupSSHAccountProvisionerConfigs']:
- configs_list =
slurm_dict['groupSSHAccountProvisionerConfigs']
- if isinstance(configs_list, list):
- converted_configs = []
- for cfg in configs_list:
- if isinstance(cfg, (dict, OrderedDict)):
-
converted_configs.append(GroupAccountSSHProvisionerConfig(**cfg))
- else:
- converted_configs.append(cfg)
- slurm_dict['groupSSHAccountProvisionerConfigs'] =
converted_configs
-
- slurm_pref = SlurmComputeResourcePreference(**slurm_dict)
- if union_type_class:
- instance.specificPreferences = union_type_class()
- instance.specificPreferences.slurm = slurm_pref
- elif 'aws' in specific_prefs:
- aws_pref =
AwsComputeResourcePreference(**specific_prefs['aws'])
- if union_type_class:
- instance.specificPreferences = union_type_class()
- instance.specificPreferences.aws = aws_pref
- elif slurm_data:
- from collections import OrderedDict
- from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
- GroupAccountSSHProvisionerConfig
- )
- if 'reservations' in slurm_data and slurm_data['reservations']:
- reservations_list = slurm_data['reservations']
- if isinstance(reservations_list, list):
- converted_reservations = []
- for res in reservations_list:
- if isinstance(res, (dict, OrderedDict)):
-
converted_reservations.append(ComputeResourceReservation(**res))
- else:
- converted_reservations.append(res)
- slurm_data['reservations'] = converted_reservations
-
- if 'groupSSHAccountProvisionerConfigs' in slurm_data and
slurm_data['groupSSHAccountProvisionerConfigs']:
- configs_list =
slurm_data['groupSSHAccountProvisionerConfigs']
- if isinstance(configs_list, list):
- converted_configs = []
- for cfg in configs_list:
- if isinstance(cfg, (dict, OrderedDict)):
-
converted_configs.append(GroupAccountSSHProvisionerConfig(**cfg))
- else:
- converted_configs.append(cfg)
- slurm_data['groupSSHAccountProvisionerConfigs'] =
converted_configs
-
- slurm_pref = SlurmComputeResourcePreference(**slurm_data)
- if union_type_class:
- instance.specificPreferences = union_type_class()
- instance.specificPreferences.slurm = slurm_pref
-
- if not hasattr(instance, 'resourceType') or instance.resourceType is
None:
- if hasattr(instance, 'specificPreferences') and
instance.specificPreferences:
- from collections import OrderedDict
- if isinstance(instance.specificPreferences, (dict,
OrderedDict)):
- if 'slurm' in instance.specificPreferences or
'allocationProjectNumber' in instance.specificPreferences:
- instance.resourceType = ResourceType.SLURM
- elif 'aws' in instance.specificPreferences or 'region' in
instance.specificPreferences:
- instance.resourceType = ResourceType.AWS
- else:
- instance.resourceType = ResourceType.SLURM
- elif hasattr(instance.specificPreferences, 'slurm') and
instance.specificPreferences.slurm:
- instance.resourceType = ResourceType.SLURM
- elif hasattr(instance.specificPreferences, 'aws') and
instance.specificPreferences.aws:
- instance.resourceType = ResourceType.AWS
- else:
- instance.resourceType = ResourceType.SLURM
- else:
- instance.resourceType = ResourceType.SLURM
- log.warning(
- "GCPreference create: Had to set resourceType=%s at end of
create(), computeResourceId=%s",
- instance.resourceType.name if hasattr(instance.resourceType,
'name') else instance.resourceType,
- instance.computeResourceId if hasattr(instance,
'computeResourceId') else 'unknown',
- )
+class _ComputeResourcePolicySerializer(serializers.Serializer):
+ resourcePolicyId = serializers.CharField(source='resource_policy_id',
allow_blank=True, allow_null=True, required=False)
+ computeResourceId = serializers.CharField(source='compute_resource_id',
allow_blank=True, allow_null=True, required=False)
+ groupResourceProfileId =
serializers.CharField(source='group_resource_profile_id', allow_blank=True,
allow_null=True, required=False)
+ allowedBatchQueues = serializers.ListField(source='allowed_batch_queues',
child=serializers.CharField(), required=False)
- if hasattr(instance, 'resourceType') and instance.resourceType:
- if instance.specificPreferences is None:
- try:
- from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
- EnvironmentSpecificPreferences
- )
- instance.specificPreferences =
EnvironmentSpecificPreferences()
- log.debug(
- "GCPreference create: Initialized empty
specificPreferences union type, computeResourceId=%s",
- instance.computeResourceId if hasattr(instance,
'computeResourceId') else 'unknown',
- )
- except ImportError as e:
- log.warning(
- "GCPreference create: Could not initialize empty
specificPreferences: %s",
- str(e),
- )
- self._convert_specific_preferences_dict_to_thrift(instance,
instance.resourceType)
+ def create(self, validated_data):
+ d = validated_data
+ return _grp_pb2().ComputeResourcePolicy(
+ resource_policy_id=d.get('resource_policy_id', '') or '',
+ compute_resource_id=d.get('compute_resource_id', '') or '',
+ group_resource_profile_id=d.get('group_resource_profile_id', '')
or '',
+ allowed_batch_queues=list(d.get('allowed_batch_queues', []) or []),
+ )
+
+
+class _BatchQueueResourcePolicySerializer(serializers.Serializer):
+ resourcePolicyId = serializers.CharField(source='resource_policy_id',
allow_blank=True, allow_null=True, required=False)
+ computeResourceId = serializers.CharField(source='compute_resource_id',
allow_blank=True, allow_null=True, required=False)
+ groupResourceProfileId =
serializers.CharField(source='group_resource_profile_id', allow_blank=True,
allow_null=True, required=False)
+ queuename = serializers.CharField(allow_blank=True, allow_null=True,
required=False)
+ maxAllowedNodes = serializers.IntegerField(source='max_allowed_nodes',
allow_null=True, required=False)
+ maxAllowedCores = serializers.IntegerField(source='max_allowed_cores',
allow_null=True, required=False)
+ maxAllowedWalltime =
serializers.IntegerField(source='max_allowed_walltime', allow_null=True,
required=False)
+
+ def create(self, validated_data):
+ d = validated_data
+ return _grp_pb2().BatchQueueResourcePolicy(
+ resource_policy_id=d.get('resource_policy_id', '') or '',
+ compute_resource_id=d.get('compute_resource_id', '') or '',
+ group_resource_profile_id=d.get('group_resource_profile_id', '')
or '',
+ queuename=d.get('queuename', '') or '',
+ max_allowed_nodes=d.get('max_allowed_nodes', 0) or 0,
+ max_allowed_cores=d.get('max_allowed_cores', 0) or 0,
+ max_allowed_walltime=d.get('max_allowed_walltime', 0) or 0,
+ )
- return instance
+class GroupResourceProfileSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``GroupResourceProfile``
message."""
-class GroupResourceProfileSerializer(
- thrift_utils.create_serializer_class(GroupResourceProfile)):
+ gatewayId = serializers.CharField(source='gateway_id', allow_blank=True,
allow_null=True, required=False)
+ groupResourceProfileId =
serializers.CharField(source='group_resource_profile_id', allow_blank=True,
allow_null=True, required=False)
+ groupResourceProfileName =
serializers.CharField(source='group_resource_profile_name')
+ computePreferences =
GroupComputeResourcePreferenceSerializer(source='compute_preferences',
many=True, required=False)
+ computeResourcePolicies =
_ComputeResourcePolicySerializer(source='compute_resource_policies', many=True,
required=False)
+ batchQueueResourcePolicies =
_BatchQueueResourcePolicySerializer(source='batch_queue_resource_policies',
many=True, required=False)
+ creationTime = ProtoTimestampField(source='creation_time',
null_if_zero=True, read_only=True)
+ updatedTime = ProtoTimestampField(source='updated_time',
null_if_zero=True, read_only=True)
+ defaultCredentialStoreToken =
serializers.CharField(source='default_credential_store_token',
allow_blank=True, allow_null=True, required=False)
url = FullyEncodedHyperlinkedIdentityField(
view_name='django_airavata_api:group-resource-profile-detail',
- lookup_field='groupResourceProfileId',
+ lookup_field='group_resource_profile_id',
lookup_url_kwarg='group_resource_profile_id')
- creationTime = UTCPosixTimestampDateTimeField(allow_null=True)
- updatedTime = UTCPosixTimestampDateTimeField(allow_null=True)
userHasWriteAccess = serializers.SerializerMethodField()
- computePreferences = GroupComputeResourcePreferenceSerializer(many=True)
- class Meta:
- required = ('groupResourceProfileName',)
+ def to_representation(self, instance):
+ ret = super().to_representation(instance)
+ if ret.get('defaultCredentialStoreToken') == '':
+ ret['defaultCredentialStoreToken'] = None
+ return ret
def create(self, validated_data):
- """
- Override create() to preserve Thrift instances in computePreferences.
- """
- compute_prefs = validated_data.get('computePreferences')
- if compute_prefs:
- all_thrift_instances = all(
- isinstance(item, GroupComputeResourcePreference)
- for item in compute_prefs
- )
- if all_thrift_instances:
- validated_data_copy = copy.deepcopy(validated_data)
- validated_data_copy['computePreferences'] = []
-
- params = self.process_nested_fields(validated_data_copy)
- params['computePreferences'] = [copy.deepcopy(pref) for pref
in compute_prefs]
-
- thrift_spec = GroupResourceProfile.thrift_spec
- for field_spec in thrift_spec:
- if field_spec:
- field_name = field_spec[2]
- default_value = field_spec[4]
- if default_value is not None:
- if field_name in params and params[field_name] is
None:
- del params[field_name]
-
- return GroupResourceProfile(**params)
-
- params = self.process_nested_fields(validated_data)
-
- if 'computePreferences' in params and params['computePreferences']:
- compute_prefs = params['computePreferences']
- processed_compute_prefs = []
- for pref in compute_prefs:
- if isinstance(pref, GroupComputeResourcePreference):
- processed_compute_prefs.append(pref)
- elif isinstance(pref, dict):
- serializer = GroupComputeResourcePreferenceSerializer()
- try:
- log.debug(
- "GCPreference create: Converting dict to Thrift
instance, computeResourceId=%s",
- pref.get('computeResourceId', 'unknown'),
- )
- thrift_pref = serializer.create(pref)
- if isinstance(thrift_pref,
GroupComputeResourcePreference):
- log.debug(
- "GCPreference create: Successfully created
Thrift instance, computeResourceId=%s resourceType=%s",
- thrift_pref.computeResourceId if
hasattr(thrift_pref, 'computeResourceId') else 'unknown',
- getattr(thrift_pref, 'resourceType', None),
- )
- processed_compute_prefs.append(thrift_pref)
- else:
- log.warning(
- "GCPreference create: serializer.create()
returned non-Thrift instance: %s, type=%s",
- pref.get('computeResourceId', 'unknown'),
- type(thrift_pref).__name__,
- )
- raise ValueError(f"serializer.create() returned
{type(thrift_pref).__name__}, expected GroupComputeResourcePreference")
- except Exception as e:
- log.warning(
- "GCPreference create: Failed to convert dict to
Thrift instance using serializer.create(): %s, trying direct construction",
- str(e),
- exc_info=True,
- )
- pref_copy = copy.deepcopy(pref)
- if 'resourceType' not in pref_copy or
pref_copy.get('resourceType') is None:
- if 'allocationProjectNumber' in pref_copy:
- pref_copy['resourceType'] = ResourceType.SLURM
- elif 'region' in pref_copy or 'preferredAmiId' in
pref_copy:
- pref_copy['resourceType'] = ResourceType.AWS
- else:
- pref_copy['resourceType'] = ResourceType.SLURM
# Default
- try:
-
processed_compute_prefs.append(GroupComputeResourcePreference(**pref_copy))
- except Exception as e2:
- log.error(
- "GCPreference create: Failed to create Thrift
instance directly: %s",
- str(e2),
- exc_info=True,
- )
- processed_compute_prefs.append(pref)
- else:
- processed_compute_prefs.append(pref)
- params['computePreferences'] = processed_compute_prefs
-
- thrift_spec = GroupResourceProfile.thrift_spec
- for field_spec in thrift_spec:
- if field_spec:
- field_name = field_spec[2]
- default_value = field_spec[4]
- if default_value is not None:
- if field_name in params and params[field_name] is None:
- del params[field_name]
-
- return GroupResourceProfile(**params)
-
- def process_nested_fields(self, validated_data):
- compute_prefs = validated_data.get('computePreferences')
- if compute_prefs is None or compute_prefs == []:
- validated_data_copy = copy.deepcopy(validated_data)
- validated_data_copy.pop('computePreferences', None)
- params = self._process_nested_fields_base(validated_data_copy)
- params['computePreferences'] = compute_prefs if compute_prefs is
not None else []
- return params
-
- if compute_prefs and all(isinstance(item,
GroupComputeResourcePreference) for item in compute_prefs):
- validated_data_copy = copy.deepcopy(validated_data)
- validated_data_copy.pop('computePreferences', None)
- params = self._process_nested_fields_base(validated_data_copy)
- params['computePreferences'] = compute_prefs
- return params
-
- return self._process_nested_fields_base(validated_data)
-
- def _process_nested_fields_base(self, validated_data):
- from rest_framework.serializers import ListField, ListSerializer,
Serializer
- if not isinstance(validated_data, dict):
- return validated_data
-
- params = copy.deepcopy(validated_data)
- fields = self.fields
-
- for field_name, serializer in fields.items():
- if (isinstance(serializer, ListField) or isinstance(serializer,
ListSerializer)):
- if (params.get(field_name, None) is not None or not
serializer.allow_null):
- if isinstance(serializer.child, Serializer):
- items = params[field_name]
- if items and all(not isinstance(item, dict) for item
in items):
- continue
-
- if field_name == 'experimentInputs' and 'type' in
serializer.child.fields:
- for item in params[field_name]:
- if isinstance(item, dict) and 'type' in item
and isinstance(item['type'], int):
- item['type'] = DataType(item['type'])
- elif field_name == 'experimentOutputs' and 'type' in
serializer.child.fields:
- for item in params[field_name]:
- if isinstance(item, dict) and 'type' in item
and isinstance(item['type'], int):
- item['type'] = DataType(item['type'])
- elif field_name == 'experimentStatus' and 'state' in
serializer.child.fields:
- for item in params[field_name]:
- if isinstance(item, dict) and 'state' in item
and isinstance(item['state'], int):
- item['state'] =
ExperimentState(item['state'])
-
- processed_items = []
- for item in params[field_name]:
- if isinstance(item, dict):
- if hasattr(serializer.child, 'create'):
- try:
-
processed_items.append(serializer.child.create(item))
- except NotImplementedError:
- processed_items.append(item)
- else:
- processed_items.append(item)
- else:
- processed_items.append(item)
- params[field_name] = processed_items
- else:
- params[field_name] =
serializer.to_representation(params[field_name])
- elif isinstance(serializer, Serializer):
- if field_name in params and params[field_name] is not None:
- if not isinstance(params[field_name], dict):
- continue
- if hasattr(serializer, 'create'):
- try:
- params[field_name] =
serializer.create(params[field_name])
- except NotImplementedError:
- pass
-
- return params
+ return _build_group_resource_profile(validated_data)
def update(self, instance, validated_data):
- # Merge existing computePreferences with incoming data to preserve
fields that aren't being updated
- if 'computePreferences' in validated_data and
instance.computePreferences:
- existing_prefs_by_id = {
- pref.computeResourceId: pref
- for pref in instance.computePreferences
- if hasattr(pref, 'computeResourceId')
- }
-
- for incoming_pref in validated_data['computePreferences']:
- if isinstance(incoming_pref, dict):
- compute_resource_id =
incoming_pref.get('computeResourceId')
- if compute_resource_id and compute_resource_id in
existing_prefs_by_id:
- existing_pref =
existing_prefs_by_id[compute_resource_id]
- if 'specificPreferences' in incoming_pref and
isinstance(incoming_pref['specificPreferences'], dict):
- if 'slurm' in incoming_pref['specificPreferences']:
- incoming_slurm =
incoming_pref['specificPreferences']['slurm']
- if isinstance(incoming_slurm, dict):
- existing_slurm = None
- if (hasattr(existing_pref,
'specificPreferences') and
- existing_pref.specificPreferences and
-
hasattr(existing_pref.specificPreferences, 'slurm') and
-
existing_pref.specificPreferences.slurm):
- existing_slurm =
existing_pref.specificPreferences.slurm
-
- simple_string_fields = [
- 'preferredBatchQueue',
- 'qualityOfService',
- 'usageReportingGatewayId',
- 'sshAccountProvisioner',
- 'sshAccountProvisionerAdditionalInfo',
- ]
- for field in simple_string_fields:
- if incoming_slurm.get(field) is None
and existing_slurm:
- existing_value =
getattr(existing_slurm, field, None)
- if existing_value is not None:
- incoming_slurm[field] =
existing_value
- log.debug(
- "GCPreference update:
Preserved existing %s=%s for computeResourceId=%s",
- field,
- existing_value,
- compute_resource_id,
- )
-
- list_fields = [
- 'groupSSHAccountProvisionerConfigs',
- 'reservations',
- ]
- for field in list_fields:
- if incoming_slurm.get(field) is None
and existing_slurm:
- existing_value =
getattr(existing_slurm, field, None)
- if existing_value is not None and
isinstance(existing_value, list) and len(existing_value) > 0:
- converted_list = []
- for item in existing_value:
- if hasattr(item,
'__dict__'):
- if field ==
'reservations':
- try:
- serializer =
ComputeResourceReservationSerializer()
-
converted_list.append(serializer.to_representation(item))
- except Exception
as e:
- log.warning(
-
"GCPreference update: Could not serialize reservation item: %s",
- str(e),
- )
- item_dict =
{k: v for k, v in item.__dict__.items() if not k.startswith('_')}
-
converted_list.append(item_dict)
- else:
- item_dict = {k: v
for k, v in item.__dict__.items() if not k.startswith('_')}
-
converted_list.append(item_dict)
- else:
-
converted_list.append(item)
- incoming_slurm[field] =
converted_list
- log.debug(
- "GCPreference update:
Preserved existing %s (list with %d items) for computeResourceId=%s",
- field,
- len(converted_list),
- compute_resource_id,
- )
-
- if 'computeResourcePolicies' in validated_data and
instance.computeResourcePolicies:
- existing_policies_by_resource_id = {
- pol.computeResourceId: pol
- for pol in instance.computeResourcePolicies
- if hasattr(pol, 'computeResourceId') and hasattr(pol,
'resourcePolicyId')
- }
-
- for incoming_policy in validated_data['computeResourcePolicies']:
- if isinstance(incoming_policy, dict):
- compute_resource_id =
incoming_policy.get('computeResourceId')
- if compute_resource_id and compute_resource_id in
existing_policies_by_resource_id:
- existing_policy =
existing_policies_by_resource_id[compute_resource_id]
- if 'resourcePolicyId' not in incoming_policy or
incoming_policy.get('resourcePolicyId') is None:
- incoming_policy['resourcePolicyId'] =
existing_policy.resourcePolicyId
- log.debug(
- "GCPreference update: Preserved existing
resourcePolicyId=%s for computeResourceId=%s",
- existing_policy.resourcePolicyId,
- compute_resource_id,
- )
-
- if 'batchQueueResourcePolicies' in validated_data and
instance.batchQueueResourcePolicies:
- existing_bq_policies_by_key = {}
- for pol in instance.batchQueueResourcePolicies:
- if hasattr(pol, 'computeResourceId') and hasattr(pol,
'queuename') and hasattr(pol, 'resourcePolicyId'):
- key = (pol.computeResourceId, pol.queuename)
- existing_bq_policies_by_key[key] = pol
-
- for incoming_bq_policy in
validated_data['batchQueueResourcePolicies']:
- if isinstance(incoming_bq_policy, dict):
- compute_resource_id =
incoming_bq_policy.get('computeResourceId')
- queuename = incoming_bq_policy.get('queuename')
- if compute_resource_id and queuename:
- key = (compute_resource_id, queuename)
- if key in existing_bq_policies_by_key:
- existing_bq_policy =
existing_bq_policies_by_key[key]
- if 'resourcePolicyId' not in incoming_bq_policy or
incoming_bq_policy.get('resourcePolicyId') is None:
- incoming_bq_policy['resourcePolicyId'] =
existing_bq_policy.resourcePolicyId
- log.debug(
- "GCPreference update: Preserved existing
resourcePolicyId=%s for batchQueueResourcePolicy computeResourceId=%s
queuename=%s",
- existing_bq_policy.resourcePolicyId,
- compute_resource_id,
- queuename,
- )
-
- result = self.create(validated_data)
- result._removed_compute_resource_preferences = []
- result._removed_compute_resource_policies = []
- result._removed_batch_queue_resource_policies = []
- # Find all compute resource preferences that were removed
- for compute_resource_preference in instance.computePreferences:
- existing_compute_resource_preference = None
- for pref in result.computePreferences:
- if isinstance(pref, GroupComputeResourcePreference):
- pref_id = pref.computeResourceId
- elif isinstance(pref, dict):
- pref_id = pref.get('computeResourceId')
- else:
- continue
- if pref_id == compute_resource_preference.computeResourceId:
- existing_compute_resource_preference = pref
- break
- if not existing_compute_resource_preference:
- result._removed_compute_resource_preferences.append(
- compute_resource_preference)
- # Find all compute resource policies that were removed
- for compute_resource_policy in instance.computeResourcePolicies:
- existing_compute_resource_policy = None
- for pol in result.computeResourcePolicies:
- if hasattr(pol, 'resourcePolicyId'):
- pol_id = pol.resourcePolicyId
- elif isinstance(pol, dict):
- pol_id = pol.get('resourcePolicyId')
- else:
- continue
- if pol_id == compute_resource_policy.resourcePolicyId:
- existing_compute_resource_policy = pol
- break
- if not existing_compute_resource_policy:
- result._removed_compute_resource_policies.append(
- compute_resource_policy)
- # Find all batch queue resource policies that were removed
- for batch_queue_resource_policy in instance.batchQueueResourcePolicies:
- existing_batch_queue_resource_policy_for_update = None
- for bq in result.batchQueueResourcePolicies:
- if hasattr(bq, 'resourcePolicyId'):
- bq_id = bq.resourcePolicyId
- elif isinstance(bq, dict):
- bq_id = bq.get('resourcePolicyId')
- else:
- continue
- if bq_id == batch_queue_resource_policy.resourcePolicyId:
- existing_batch_queue_resource_policy_for_update = bq
- break
- if not existing_batch_queue_resource_policy_for_update:
- result._removed_batch_queue_resource_policies.append(
- batch_queue_resource_policy)
- return result
+ return _build_group_resource_profile(validated_data)
def get_userHasWriteAccess(self, groupResourceProfile):
request = self.context['request']
write_access = user_has_access(
- request, groupResourceProfile.groupResourceProfileId, "WRITE")
+ request, groupResourceProfile.group_resource_profile_id, "WRITE")
if not write_access:
return False
- # Check that user has READ access to all tokens in this
- # GroupResourceProfile
- tokens = set([groupResourceProfile.defaultCredentialStoreToken] +
- [cp.resourceSpecificCredentialStoreToken
- for cp in groupResourceProfile.computePreferences])
+ # Check that the user has READ access to all credential tokens.
+ tokens = set([groupResourceProfile.default_credential_store_token] +
+ [cp.resource_specific_credential_store_token
+ for cp in groupResourceProfile.compute_preferences])
def check_token(token):
- return token is None or user_has_access(request, token, "READ")
+ return not token or user_has_access(request, token, "READ")
return all(map(check_token, tokens))
+def _build_group_resource_profile(d):
+ grp = _grp_pb2()
+ return grp.GroupResourceProfile(
+ gateway_id=d.get('gateway_id', '') or '',
+ group_resource_profile_id=d.get('group_resource_profile_id', '') or '',
+ group_resource_profile_name=d.get('group_resource_profile_name', '')
or '',
+ compute_preferences=[
+ _build_group_compute_resource_preference(p)
+ for p in d.get('compute_preferences', []) or []],
+ compute_resource_policies=[
+ _ComputeResourcePolicySerializer().create(p)
+ for p in d.get('compute_resource_policies', []) or []],
+ batch_queue_resource_policies=[
+ _BatchQueueResourcePolicySerializer().create(p)
+ for p in d.get('batch_queue_resource_policies', []) or []],
+ default_credential_store_token=d.get('default_credential_store_token',
'') or '',
+ )
+
+
class UserPermissionSerializer(serializers.Serializer):
user = UserProfileSerializer()
permissionType = serializers.IntegerField()
diff --git a/airavata-django-portal/django_airavata/apps/api/views.py
b/airavata-django-portal/django_airavata/apps/api/views.py
index 4c24a4877..2ba84c324 100644
--- a/airavata-django-portal/django_airavata/apps/api/views.py
+++ b/airavata-django-portal/django_airavata/apps/api/views.py
@@ -10,10 +10,6 @@ from urllib.parse import quote
from airavata.model.experiment.ttypes import (
ExperimentSearchFields
)
-from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- GroupComputeResourcePreference,
- ResourceType
-)
from airavata.model.group.ttypes import ResourcePermissionType
from airavata_django_portal_sdk import (
experiment_util,
@@ -963,196 +959,49 @@ class GroupResourceProfileViewSet(APIBackedViewSet):
lookup_field = 'group_resource_profile_id'
def get_list(self):
- return [
- grpc_adapters.group_resource_profile(p)
- for p in self.request.airavata.compute.get_group_resource_list()
- ]
+ return list(self.request.airavata.compute.get_group_resource_list())
def get_instance(self, lookup_value):
- return grpc_adapters.group_resource_profile(
-
self.request.airavata.compute.get_group_resource_profile(lookup_value))
+ return self.request.airavata.compute.get_group_resource_profile(
+ lookup_value)
def perform_create(self, serializer):
- group_resource_profile = serializer.save()
- group_resource_profile.gatewayId = self.gateway_id
+ group_resource_profile = serializer.save(gateway_id=self.gateway_id)
created = self.request.airavata.compute.create_group_resource_profile(
- grpc_requests.group_resource_profile(group_resource_profile))
- group_resource_profile.groupResourceProfileId =
created.group_resource_profile_id
- group_resource_profile.creationTime = created.creation_time
+ group_resource_profile)
+ group_resource_profile.group_resource_profile_id = (
+ created.group_resource_profile_id)
+ group_resource_profile.creation_time = created.creation_time
def perform_update(self, serializer):
- original_instance = serializer.instance
-
+ original = serializer.instance
grp = serializer.save()
- for removed_compute_resource_preference \
- in grp._removed_compute_resource_preferences:
- self.request.airavata.compute.remove_group_compute_prefs(
- removed_compute_resource_preference.groupResourceProfileId,
- removed_compute_resource_preference.computeResourceId)
- for removed_compute_resource_policy \
- in grp._removed_compute_resource_policies:
- self.request.airavata.compute.remove_group_compute_resource_policy(
- removed_compute_resource_policy.resourcePolicyId)
- for removed_batch_queue_resource_policy \
- in grp._removed_batch_queue_resource_policies:
-
self.request.airavata.compute.remove_group_batch_queue_resource_policy(
- removed_batch_queue_resource_policy.resourcePolicyId)
- if hasattr(grp, 'computePreferences') and grp.computePreferences:
- from collections import OrderedDict
- from django_airavata.apps.api.serializers import
GroupComputeResourcePreferenceSerializer
-
- for pref in grp.computePreferences:
- if isinstance(pref, GroupComputeResourcePreference):
- if not hasattr(pref, 'resourceType') or pref.resourceType
is None:
- resource_type = None
- if hasattr(pref, 'specificPreferences') and
pref.specificPreferences:
- if isinstance(pref.specificPreferences, (dict,
OrderedDict)):
- specific_prefs_dict = pref.specificPreferences
- if 'slurm' in specific_prefs_dict or
'allocationProjectNumber' in specific_prefs_dict:
- resource_type = ResourceType.SLURM
- elif 'aws' in specific_prefs_dict or 'region'
in specific_prefs_dict:
- resource_type = ResourceType.AWS
- else:
- resource_type = ResourceType.SLURM
- elif hasattr(pref.specificPreferences, 'slurm')
and pref.specificPreferences.slurm:
- resource_type = ResourceType.SLURM
- elif hasattr(pref.specificPreferences, 'aws') and
pref.specificPreferences.aws:
- resource_type = ResourceType.AWS
- else:
- resource_type = ResourceType.SLURM
- else:
- resource_type = ResourceType.SLURM
- pref.resourceType = resource_type
-
- resource_type = pref.resourceType if hasattr(pref,
'resourceType') and pref.resourceType else None
- if resource_type:
- if hasattr(pref, 'specificPreferences') and
isinstance(pref.specificPreferences, (dict, OrderedDict)):
-
GroupComputeResourcePreferenceSerializer._convert_specific_preferences_dict_to_thrift(
- pref, resource_type
- )
- elif hasattr(pref, 'specificPreferences') and
pref.specificPreferences:
-
GroupComputeResourcePreferenceSerializer._convert_specific_preferences_dict_to_thrift(
- pref, resource_type
- )
-
- from collections import OrderedDict
- from airavata.model.appcatalog.groupresourceprofile.ttypes import (
- ComputeResourcePolicy,
- BatchQueueResourcePolicy
- )
-
- if hasattr(grp, 'computeResourcePolicies') and
grp.computeResourcePolicies:
- existing_policies_by_resource_id = {}
- if original_instance and hasattr(original_instance,
'computeResourcePolicies'):
- for existing_policy in
original_instance.computeResourcePolicies:
- if hasattr(existing_policy, 'computeResourceId') and
hasattr(existing_policy, 'resourcePolicyId'):
-
existing_policies_by_resource_id[existing_policy.computeResourceId] =
existing_policy
-
- indices_to_remove = []
- for idx, policy in enumerate(grp.computeResourcePolicies):
- if isinstance(policy, (dict, OrderedDict)):
- try:
- if isinstance(policy, OrderedDict):
- policy = dict(policy)
-
- compute_resource_id = policy.get('computeResourceId')
- current_resource_policy_id =
policy.get('resourcePolicyId')
-
- if not current_resource_policy_id:
- if compute_resource_id and compute_resource_id in
existing_policies_by_resource_id:
- existing_policy =
existing_policies_by_resource_id[compute_resource_id]
- policy['resourcePolicyId'] =
existing_policy.resourcePolicyId
- elif original_instance and
hasattr(original_instance, 'computeResourcePolicies') and idx <
len(original_instance.computeResourcePolicies):
- existing_policy_by_idx =
original_instance.computeResourcePolicies[idx]
- if hasattr(existing_policy_by_idx,
'resourcePolicyId') and existing_policy_by_idx.resourcePolicyId:
- policy['resourcePolicyId'] =
existing_policy_by_idx.resourcePolicyId
-
- if not policy.get('resourcePolicyId'):
- indices_to_remove.append(idx)
- continue
-
- grp.computeResourcePolicies[idx] =
ComputeResourcePolicy(**policy)
- except Exception as e:
- log.error(
- "GCPreference perform_update: Failed to convert
computeResourcePolicies[%d] OrderedDict to Thrift: %s, policy keys: %s",
- idx,
- str(e),
- list(policy.keys()) if isinstance(policy, dict)
else list(policy.keys()),
- exc_info=True,
- )
- raise
-
- for idx in reversed(indices_to_remove):
- grp.computeResourcePolicies.pop(idx)
-
- if hasattr(grp, 'batchQueueResourcePolicies') and
grp.batchQueueResourcePolicies:
- existing_bq_policies_by_key = {}
- if original_instance and hasattr(original_instance,
'batchQueueResourcePolicies'):
- for existing_bq_policy in
original_instance.batchQueueResourcePolicies:
- if (hasattr(existing_bq_policy, 'computeResourceId') and
- hasattr(existing_bq_policy, 'queuename') and
- hasattr(existing_bq_policy, 'resourcePolicyId')):
- key = (existing_bq_policy.computeResourceId,
existing_bq_policy.queuename)
- existing_bq_policies_by_key[key] = existing_bq_policy
-
- for idx, policy in enumerate(grp.batchQueueResourcePolicies):
- if isinstance(policy, (dict, OrderedDict)):
- try:
- compute_resource_id = policy.get('computeResourceId')
- queuename = policy.get('queuename')
- if compute_resource_id and queuename:
- key = (compute_resource_id, queuename)
- if key in existing_bq_policies_by_key:
- existing_bq_policy =
existing_bq_policies_by_key[key]
- if 'resourcePolicyId' not in policy or
policy.get('resourcePolicyId') is None:
- policy['resourcePolicyId'] =
existing_bq_policy.resourcePolicyId
- grp.batchQueueResourcePolicies[idx] =
BatchQueueResourcePolicy(**policy)
- except Exception as e:
- log.error(
- "GCPreference perform_update: Failed to convert
batchQueueResourcePolicies[%d] OrderedDict to Thrift: %s",
- idx,
- str(e),
- exc_info=True,
- )
-
- if hasattr(grp, 'computePreferences') and grp.computePreferences:
- for idx, pref in enumerate(grp.computePreferences):
- if isinstance(pref, (dict, OrderedDict)):
- from django_airavata.apps.api.serializers import
GroupComputeResourcePreferenceSerializer
- serializer = GroupComputeResourcePreferenceSerializer()
- try:
- pref = serializer.create(pref)
- grp.computePreferences[idx] = pref
- except Exception as e:
- log.error(
- "GCPreference perform_update: Failed to convert
OrderedDict to Thrift: %s",
- str(e),
- exc_info=True,
- )
-
- if isinstance(pref, GroupComputeResourcePreference):
- if hasattr(pref, 'specificPreferences') and
pref.specificPreferences:
- if hasattr(pref.specificPreferences, 'slurm') and
pref.specificPreferences.slurm:
-
GroupComputeResourcePreferenceSerializer._convert_nested_list_fields_to_thrift(
- pref.specificPreferences.slurm
- )
- if hasattr(pref.specificPreferences.slurm,
'reservations') and pref.specificPreferences.slurm.reservations:
- for res_idx, res in
enumerate(pref.specificPreferences.slurm.reservations):
- if isinstance(res, (dict, OrderedDict)):
- from
airavata.model.appcatalog.groupresourceprofile.ttypes import
ComputeResourceReservation
-
pref.specificPreferences.slurm.reservations[res_idx] =
ComputeResourceReservation(**res)
- if hasattr(pref.specificPreferences.slurm,
'groupSSHAccountProvisionerConfigs') and
pref.specificPreferences.slurm.groupSSHAccountProvisionerConfigs:
- for cfg_idx, cfg in
enumerate(pref.specificPreferences.slurm.groupSSHAccountProvisionerConfigs):
- if isinstance(cfg, (dict, OrderedDict)):
- from
airavata.model.appcatalog.groupresourceprofile.ttypes import
GroupAccountSSHProvisionerConfig
-
pref.specificPreferences.slurm.groupSSHAccountProvisionerConfigs[cfg_idx] =
GroupAccountSSHProvisionerConfig(**cfg)
-
- self.request.airavata.compute.update_group_resource_profile(
- grp.groupResourceProfileId,
grpc_requests.group_resource_profile(grp))
+ compute = self.request.airavata.compute
+ # Remove compute prefs / policies that are no longer present.
+ new_pref_ids = {
+ cp.compute_resource_id for cp in grp.compute_preferences}
+ for cp in original.compute_preferences:
+ if cp.compute_resource_id not in new_pref_ids:
+ compute.remove_group_compute_prefs(
+ cp.group_resource_profile_id, cp.compute_resource_id)
+ new_policy_ids = {
+ p.resource_policy_id for p in grp.compute_resource_policies}
+ for p in original.compute_resource_policies:
+ if p.resource_policy_id and p.resource_policy_id not in
new_policy_ids:
+ compute.remove_group_compute_resource_policy(
+ p.resource_policy_id)
+ new_bq_ids = {
+ p.resource_policy_id for p in grp.batch_queue_resource_policies}
+ for p in original.batch_queue_resource_policies:
+ if p.resource_policy_id and p.resource_policy_id not in new_bq_ids:
+ compute.remove_group_batch_queue_resource_policy(
+ p.resource_policy_id)
+ compute.update_group_resource_profile(
+ grp.group_resource_profile_id, grp)
def perform_destroy(self, instance):
self.request.airavata.compute.remove_group_resource_profile(
- instance.groupResourceProfileId)
+ instance.group_resource_profile_id)
class SharedEntityViewSet(mixins.RetrieveModelMixin,