This is an automated email from the ASF dual-hosted git repository.
yasithdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airavata-portals.git
The following commit(s) were added to refs/heads/main by this push:
new 114a9e078 refactor(portal): make compute + storage resource
serializers proto-native (Track D) (#199)
114a9e078 is described below
commit 114a9e0785e3ed9e86c6a48ec5ed1f75e06ecd50
Author: Yasith Jayawardana <[email protected]>
AuthorDate: Tue Jun 9 04:04:26 2026 -0400
refactor(portal): make compute + storage resource serializers proto-native
(Track D) (#199)
Rewrite ComputeResourceDescriptionSerializer, StorageResourceSerializer,
BatchQueueSerializer, and the nested
JobSubmissionInterface/DataMovementInterface
serializers to read the gRPC protobuf directly, emitting the same
Thrift-named
JSON keys.
- New ProtoFileSystemsMapField bridges the proto file_systems
map<int32,string>
(key = proto FileSystems value) to the {Thrift FileSystems int (as
string):
path} dict the old i32-keyed Thrift map produced (proto HOME=1 -> Thrift
0).
- Protocol enums (job-submission / data-movement) render as Thrift ints via
the
shared protocol-field helpers (divergent JSP_CLOUD/LOCAL handled by
name_map).
- Repoint ComputeResourceViewSet (instance/queues), StorageResourceViewSet
(instance), ApplicationDeploymentViewSet.queues (now mutates proto
batch-queue
fields), and FullExperimentViewSet's nested compute resource to pass
protobuf
through directly, dropping
grpc_adapters.{compute_resource,storage_resource,
_batch_queue,_job_submission_interface,_data_movement_interface,_file_systems}
and the Thrift
ComputeResourceDescription/BatchQueue/StorageResourceDescription
imports.
Validated byte-for-byte (storage resource w/ data-movement interface, the
deep
compute resource w/ batch queues + fileSystems int-keyed map + both
interface
lists, batch queue) vs the old adapter+serializer path. manage.py check
green;
api test failures unchanged vs origin/main.
---
.../django_airavata/apps/api/grpc_adapters.py | 120 --------------
.../django_airavata/apps/api/serializers.py | 172 +++++++++++++++++++--
.../django_airavata/apps/api/views.py | 38 ++---
3 files changed, 174 insertions(+), 156 deletions(-)
diff --git a/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
b/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
index b74747554..8c220afec 100644
--- a/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
+++ b/airavata-django-portal/django_airavata/apps/api/grpc_adapters.py
@@ -103,35 +103,6 @@ _DATA_MOVEMENT_PROTOCOL = {
}
-def _data_movement_interface(pb):
- """gRPC ``DataMovementInterface`` -> auto-generated serializer shape."""
- return SimpleNamespace(
- dataMovementInterfaceId=pb.data_movement_interface_id,
- dataMovementProtocol=_thrift_enum_mapped(
- pb, 'data_movement_protocol', _DATA_MOVEMENT_PROTOCOL),
- priorityOrder=pb.priority_order,
- creationTime=pb.creation_time or None,
- updateTime=pb.update_time or None,
- storageResourceId=pb.storage_resource_id,
- )
-
-
-def storage_resource(pb):
- """gRPC ``StorageResourceDescription`` -> ``StorageResourceSerializer``
shape."""
- return SimpleNamespace(
- storageResourceId=pb.storage_resource_id,
- hostName=pb.host_name,
- storageResourceDescription=pb.storage_resource_description,
- enabled=pb.enabled,
- dataMovementInterfaces=[
- _data_movement_interface(d) for d in pb.data_movement_interfaces],
- # top-level creation/update use UTCPosixTimestampDateTimeField (not
- # nullable, divides by 1000) -> keep the int.
- creationTime=pb.creation_time,
- updateTime=pb.update_time,
- )
-
-
# proto JobSubmissionProtocol member name -> Thrift JobSubmissionProtocol
value.
# Mostly aligned, but proto JSP_CLOUD maps to Thrift CLOUD (name divergence).
_JOB_SUBMISSION_PROTOCOL = {
@@ -144,97 +115,6 @@ _JOB_SUBMISSION_PROTOCOL = {
'LOCAL_FORK': _ThriftJobSubmissionProtocol.LOCAL_FORK,
}
-# proto FileSystems int value -> Thrift FileSystems int value (built lazily by
-# name; the proto map key is a bare int32 with no enum descriptor to read).
-_file_systems_proto_to_thrift = None
-
-
-def _file_systems(pb_map):
- """proto ``file_systems`` map<int32, string> -> {Thrift FileSystems int:
path}.
-
- The proto map key is a bare int32 holding a proto ``FileSystems`` value;
- convert each to the Thrift ``FileSystems`` int (by name — proto HOME=1 vs
- Thrift HOME=0) so the serializer's ``DictField`` renders the same '0'..'4'
- keys the Thrift i32-keyed map produced. Keys stay plain ints (not IntEnum)
- so ``DictField``'s ``str(key)`` yields the digit, as Thrift's map did.
- Unknown keys (e.g. the proto-only zero sentinel) are dropped.
- """
- global _file_systems_proto_to_thrift
- if _file_systems_proto_to_thrift is None:
- from airavata.model.appcatalog.computeresource.ttypes import
FileSystems
- from
airavata_sdk.generated.org.apache.airavata.model.appcatalog.computeresource
import ( # noqa: E501
- compute_resource_pb2,
- )
- proto_fs = compute_resource_pb2.FileSystems
- _file_systems_proto_to_thrift = {
- proto_fs.Value(name): int(getattr(FileSystems, name))
- for name in proto_fs.keys() if hasattr(FileSystems, name)
- }
- return {
- _file_systems_proto_to_thrift[k]: v
- for k, v in pb_map.items() if k in _file_systems_proto_to_thrift
- }
-
-
-def _batch_queue(pb):
- """gRPC ``BatchQueue`` -> ``BatchQueueSerializer`` shape (all scalars)."""
- return SimpleNamespace(
- queueName=pb.queue_name,
- queueDescription=pb.queue_description,
- maxRunTime=pb.max_run_time,
- maxNodes=pb.max_nodes,
- maxProcessors=pb.max_processors,
- maxJobsInQueue=pb.max_jobs_in_queue,
- maxMemory=pb.max_memory,
- cpuPerNode=pb.cpu_per_node,
- defaultNodeCount=pb.default_node_count,
- defaultCPUCount=pb.default_cpu_count,
- defaultWalltime=pb.default_walltime,
- queueSpecificMacros=pb.queue_specific_macros,
- isDefaultQueue=pb.is_default_queue,
- )
-
-
-def _job_submission_interface(pb):
- """gRPC ``JobSubmissionInterface`` -> auto-generated serializer shape."""
- return SimpleNamespace(
- jobSubmissionInterfaceId=pb.job_submission_interface_id,
- jobSubmissionProtocol=_thrift_enum_mapped(
- pb, 'job_submission_protocol', _JOB_SUBMISSION_PROTOCOL),
- priorityOrder=pb.priority_order,
- )
-
-
-def compute_resource(pb):
- """gRPC ``ComputeResourceDescription`` ->
``ComputeResourceDescriptionSerializer`` shape.
-
- The deepest read model: recursively adapts batch queues, the file-systems
- map, and the job-submission and data-movement interface lists.
- """
- return SimpleNamespace(
- computeResourceId=pb.compute_resource_id,
- hostName=pb.host_name,
- hostAliases=list(pb.host_aliases),
- ipAddresses=list(pb.ip_addresses),
- resourceDescription=pb.resource_description,
- enabled=pb.enabled,
- batchQueues=[_batch_queue(q) for q in pb.batch_queues],
- fileSystems=_file_systems(pb.file_systems),
- jobSubmissionInterfaces=[
- _job_submission_interface(j) for j in
pb.job_submission_interfaces],
- dataMovementInterfaces=[
- _data_movement_interface(d) for d in pb.data_movement_interfaces],
- maxMemoryPerNode=pb.max_memory_per_node,
- gatewayUsageReporting=pb.gateway_usage_reporting,
- gatewayUsageModuleLoadCommand=pb.gateway_usage_module_load_command,
- gatewayUsageExecutable=pb.gateway_usage_executable,
- cpusPerNode=pb.cpus_per_node,
- defaultNodeCount=pb.default_node_count,
- defaultCPUCount=pb.default_cpu_count,
- defaultWalltime=pb.default_walltime,
- )
-
-
def _input_data_object(pb):
"""gRPC ``InputDataObjectType`` -> ``InputDataObjectTypeSerializer``
shape."""
return SimpleNamespace(
diff --git a/airavata-django-portal/django_airavata/apps/api/serializers.py
b/airavata-django-portal/django_airavata/apps/api/serializers.py
index 68d3eefd5..0f260c660 100644
--- a/airavata-django-portal/django_airavata/apps/api/serializers.py
+++ b/airavata-django-portal/django_airavata/apps/api/serializers.py
@@ -15,10 +15,6 @@ from airavata.model.appcatalog.appdeployment.ttypes import (
from airavata.model.appcatalog.appinterface.ttypes import (
ApplicationInterfaceDescription
)
-from airavata.model.appcatalog.computeresource.ttypes import (
- BatchQueue,
- ComputeResourceDescription
-)
from airavata.model.appcatalog.groupresourceprofile.ttypes import (
ComputeResourceReservation,
GroupComputeResourcePreference,
@@ -28,9 +24,6 @@ from airavata.model.appcatalog.groupresourceprofile.ttypes
import (
AwsComputeResourcePreference
)
from airavata.model.appcatalog.parser.ttypes import IOType as _ThriftIOType
-from airavata.model.appcatalog.storageresource.ttypes import (
- StorageResourceDescription
-)
from airavata.model.application.io.ttypes import (
InputDataObjectType,
OutputDataObjectType
@@ -316,6 +309,42 @@ def data_movement_protocol_field(**kwargs):
name_map=_DATA_MOVEMENT_PROTOCOL_NAME_MAP, **kwargs)
+class ProtoFileSystemsMapField(serializers.Field):
+ """Renders a proto ``map<int32, string>`` whose int key holds a proto
+ ``FileSystems`` value as the ``{Thrift FileSystems int (as a string):
path}``
+ dict the old i32-keyed Thrift map produced.
+
+ proto and Thrift assign different integers to the same member (proto HOME=1
+ vs Thrift HOME=0), so the key is bridged by NAME; the JSON key is the
Thrift
+ integer as a string (DRF ``DictField`` rendered ``str(key)`` for the i32
map).
+ """
+
+ def __init__(self, **kwargs):
+ kwargs.setdefault('read_only', True)
+ super().__init__(**kwargs)
+ self._proto_to_thrift = None
+
+ def _key_map(self):
+ if self._proto_to_thrift is None:
+ from airavata.model.appcatalog.computeresource.ttypes import (
+ FileSystems,
+ )
+ from
airavata_sdk.generated.org.apache.airavata.model.appcatalog.computeresource
import ( # noqa: E501
+ compute_resource_pb2,
+ )
+ proto_fs = compute_resource_pb2.FileSystems
+ self._proto_to_thrift = {
+ proto_fs.Value(name): int(getattr(FileSystems, name))
+ for name in proto_fs.keys() if hasattr(FileSystems, name)
+ }
+ return self._proto_to_thrift
+
+ def to_representation(self, value):
+ key_map = self._key_map()
+ return {
+ str(key_map[k]): v for k, v in value.items() if k in key_map}
+
+
class StoredJSONField(serializers.JSONField):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -752,13 +781,109 @@ class ApplicationDeploymentDescriptionSerializer(
self.context['request'], appDeployment.appDeploymentId)
-class ComputeResourceDescriptionSerializer(
- thrift_utils.create_serializer_class(ComputeResourceDescription)):
- pass
+class BatchQueueSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``BatchQueue`` message."""
+ queueName = serializers.CharField(
+ source='queue_name', allow_blank=True, allow_null=True, required=False)
+ queueDescription = serializers.CharField(
+ source='queue_description', allow_blank=True, allow_null=True,
+ required=False)
+ maxRunTime = serializers.IntegerField(
+ source='max_run_time', allow_null=True, required=False)
+ maxNodes = serializers.IntegerField(
+ source='max_nodes', allow_null=True, required=False)
+ maxProcessors = serializers.IntegerField(
+ source='max_processors', allow_null=True, required=False)
+ maxJobsInQueue = serializers.IntegerField(
+ source='max_jobs_in_queue', allow_null=True, required=False)
+ maxMemory = serializers.IntegerField(
+ source='max_memory', allow_null=True, required=False)
+ cpuPerNode = serializers.IntegerField(
+ source='cpu_per_node', allow_null=True, required=False)
+ defaultNodeCount = serializers.IntegerField(
+ source='default_node_count', allow_null=True, required=False)
+ defaultCPUCount = serializers.IntegerField(
+ source='default_cpu_count', allow_null=True, required=False)
+ defaultWalltime = serializers.IntegerField(
+ source='default_walltime', allow_null=True, required=False)
+ queueSpecificMacros = serializers.CharField(
+ source='queue_specific_macros', allow_blank=True, allow_null=True,
+ required=False)
+ isDefaultQueue = serializers.BooleanField(
+ source='is_default_queue', required=False, default=False)
+
+
+class JobSubmissionInterfaceSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``JobSubmissionInterface``
message."""
+
+ jobSubmissionInterfaceId = serializers.CharField(
+ source='job_submission_interface_id', allow_blank=True,
allow_null=True,
+ required=False)
+ jobSubmissionProtocol = job_submission_protocol_field(
+ source='job_submission_protocol', required=False, allow_null=True)
+ priorityOrder = serializers.IntegerField(
+ source='priority_order', allow_null=True, required=False)
-class BatchQueueSerializer(thrift_utils.create_serializer_class(BatchQueue)):
- pass
+
+class DataMovementInterfaceSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``DataMovementInterface``
message."""
+
+ dataMovementInterfaceId = serializers.CharField(
+ source='data_movement_interface_id', allow_blank=True, allow_null=True,
+ required=False)
+ dataMovementProtocol = data_movement_protocol_field(
+ source='data_movement_protocol', required=False, allow_null=True)
+ priorityOrder = serializers.IntegerField(
+ source='priority_order', allow_null=True, required=False)
+ creationTime = ProtoIntOrNoneField(source='creation_time')
+ updateTime = ProtoIntOrNoneField(source='update_time')
+ storageResourceId = serializers.CharField(
+ source='storage_resource_id', allow_blank=True, allow_null=True,
+ required=False)
+
+
+class ComputeResourceDescriptionSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``ComputeResourceDescription``
message."""
+
+ computeResourceId = serializers.CharField(
+ source='compute_resource_id', allow_blank=True, allow_null=True,
+ required=False)
+ hostName = serializers.CharField(
+ source='host_name', allow_blank=True, allow_null=True, required=False)
+ hostAliases = serializers.ListField(
+ source='host_aliases', child=serializers.CharField(), required=False)
+ ipAddresses = serializers.ListField(
+ source='ip_addresses', child=serializers.CharField(), required=False)
+ resourceDescription = serializers.CharField(
+ source='resource_description', allow_blank=True, allow_null=True,
+ required=False)
+ enabled = serializers.BooleanField(required=False, default=False)
+ batchQueues = BatchQueueSerializer(
+ source='batch_queues', many=True, required=False)
+ fileSystems = ProtoFileSystemsMapField(source='file_systems',
required=False)
+ jobSubmissionInterfaces = JobSubmissionInterfaceSerializer(
+ source='job_submission_interfaces', many=True, required=False)
+ dataMovementInterfaces = DataMovementInterfaceSerializer(
+ source='data_movement_interfaces', many=True, required=False)
+ maxMemoryPerNode = serializers.IntegerField(
+ source='max_memory_per_node', allow_null=True, required=False)
+ gatewayUsageReporting = serializers.BooleanField(
+ source='gateway_usage_reporting', required=False, default=False)
+ gatewayUsageModuleLoadCommand = serializers.CharField(
+ source='gateway_usage_module_load_command', allow_blank=True,
+ allow_null=True, required=False)
+ gatewayUsageExecutable = serializers.CharField(
+ source='gateway_usage_executable', allow_blank=True, allow_null=True,
+ required=False)
+ cpusPerNode = serializers.IntegerField(
+ source='cpus_per_node', allow_null=True, required=False)
+ defaultNodeCount = serializers.IntegerField(
+ source='default_node_count', allow_null=True, required=False)
+ defaultCPUCount = serializers.IntegerField(
+ source='default_cpu_count', allow_null=True, required=False)
+ defaultWalltime = serializers.IntegerField(
+ source='default_walltime', allow_null=True, required=False)
class ExperimentStatusSerializer(
@@ -2274,14 +2399,27 @@ class
GatewayResourceProfileSerializer(serializers.Serializer):
return self.create(validated_data)
-class StorageResourceSerializer(
- thrift_utils.create_serializer_class(StorageResourceDescription)):
+class StorageResourceSerializer(serializers.Serializer):
+ """Proto-native serializer for the gRPC ``StorageResourceDescription``
message."""
+
+ storageResourceId = serializers.CharField(
+ source='storage_resource_id', allow_blank=True, allow_null=True,
+ required=False)
+ hostName = serializers.CharField(
+ source='host_name', allow_blank=True, allow_null=True, required=False)
+ storageResourceDescription = serializers.CharField(
+ source='storage_resource_description', allow_blank=True,
allow_null=True,
+ required=False)
+ enabled = serializers.BooleanField(required=False, default=False)
+ dataMovementInterfaces = DataMovementInterfaceSerializer(
+ source='data_movement_interfaces', many=True, required=False)
+ # top-level creation/update render as ISO (non-nullable UTC fields).
+ creationTime = ProtoTimestampField(source='creation_time', required=False)
+ updateTime = ProtoTimestampField(source='update_time', required=False)
url = FullyEncodedHyperlinkedIdentityField(
view_name='django_airavata_api:storage-resource-detail',
- lookup_field='storageResourceId',
+ lookup_field='storage_resource_id',
lookup_url_kwarg='storage_resource_id')
- creationTime = UTCPosixTimestampDateTimeField()
- updateTime = UTCPosixTimestampDateTimeField()
def _parser_pb2():
diff --git a/airavata-django-portal/django_airavata/apps/api/views.py
b/airavata-django-portal/django_airavata/apps/api/views.py
index f53e88445..25b02e1ba 100644
--- a/airavata-django-portal/django_airavata/apps/api/views.py
+++ b/airavata-django-portal/django_airavata/apps/api/views.py
@@ -447,10 +447,10 @@ class FullExperimentViewSet(mixins.RetrieveModelMixin,
comp_res_sched = user_conf.computationalResourceScheduling
compute_resource_id = comp_res_sched.resourceHostId
try:
- compute_resource = grpc_adapters.compute_resource(
+ compute_resource = (
self.request.airavata.compute.get_compute_resource(
- compute_resource_id)) \
- if compute_resource_id else None
+ compute_resource_id)
+ if compute_resource_id else None)
except Exception:
log.exception("Failed to load compute resource for {}".format(
compute_resource_id), extra={'request': self.request})
@@ -702,20 +702,22 @@ class ApplicationDeploymentViewSet(APIBackedViewSet):
app_deployment = grpc_adapters.application_deployment(
self.request.airavata.research.get_application_deployment(
app_deployment_id))
- compute_resource = grpc_adapters.compute_resource(
- request.airavata.compute.get_compute_resource(
- app_deployment.computeHostId))
+ compute_resource = request.airavata.compute.get_compute_resource(
+ app_deployment.computeHostId)
# Override defaults with app deployment default queue, if defined
batch_queues = []
- for batch_queue in compute_resource.batchQueues:
+ for batch_queue in compute_resource.batch_queues:
if app_deployment.defaultQueueName:
- if app_deployment.defaultQueueName == batch_queue.queueName:
- batch_queue.isDefaultQueue = True
- batch_queue.defaultNodeCount =
app_deployment.defaultNodeCount
- batch_queue.defaultCPUCount =
app_deployment.defaultCPUCount
- batch_queue.defaultWalltime =
app_deployment.defaultWalltime
+ if app_deployment.defaultQueueName == batch_queue.queue_name:
+ batch_queue.is_default_queue = True
+ batch_queue.default_node_count = (
+ app_deployment.defaultNodeCount or 0)
+ batch_queue.default_cpu_count = (
+ app_deployment.defaultCPUCount or 0)
+ batch_queue.default_walltime = (
+ app_deployment.defaultWalltime or 0)
else:
- batch_queue.isDefaultQueue = False
+ batch_queue.is_default_queue = False
batch_queues.append(batch_queue)
serializer = serializers.BatchQueueSerializer(
batch_queues, many=True, context={'request': request})
@@ -728,8 +730,7 @@ class ComputeResourceViewSet(mixins.RetrieveModelMixin,
lookup_field = 'compute_resource_id'
def get_instance(self, lookup_value, format=None):
- return grpc_adapters.compute_resource(
- self.request.airavata.compute.get_compute_resource(lookup_value))
+ return self.request.airavata.compute.get_compute_resource(lookup_value)
@action(detail=False)
def all_names(self, request, format=None):
@@ -753,8 +754,8 @@ class ComputeResourceViewSet(mixins.RetrieveModelMixin,
@action(detail=True)
def queues(self, request, compute_resource_id, format=None):
- details = grpc_adapters.compute_resource(
- request.airavata.compute.get_compute_resource(compute_resource_id))
+ details = request.airavata.compute.get_compute_resource(
+ compute_resource_id)
serializer = self.serializer_class(instance=details,
context={'request': request})
data = serializer.data
@@ -1539,8 +1540,7 @@ class StorageResourceViewSet(mixins.RetrieveModelMixin,
lookup_field = 'storage_resource_id'
def get_instance(self, lookup_value, format=None):
- return grpc_adapters.storage_resource(
- self.request.airavata.storage.get_storage_resource(lookup_value))
+ return self.request.airavata.storage.get_storage_resource(lookup_value)
@action(detail=False)
def all_names(self, request, format=None):