sjyango commented on code in PR #57868:
URL: https://github.com/apache/doris/pull/57868#discussion_r2554980625
##########
be/src/udf/python/python_udf_server.py:
##########
@@ -990,6 +1750,145 @@ def do_exchange(
started = True
writer.write_batch(result_batch)
+ def _do_exchange_udaf(
+ self,
+ python_udaf_meta: PythonUDFMeta,
+ reader: flight.MetadataRecordBatchReader,
+ writer: flight.MetadataRecordBatchWriter,
+ ) -> None:
+ """
+ Handle bidirectional streaming for UDAF execution.
+
+ Request Schema (unified for ALL operations):
+ - operation: int8 - UDAFOperationType enum value
+ - place_id: int64 - Unique identifier for the aggregate state
+ - metadata: binary - Serialized metadata (operation-specific)
+ - data: binary - Serialized data (operation-specific)
+
+ Response Schema (unified for ALL operations):
+ - result_data: binary - Serialized result RecordBatch in Arrow IPC
format
+ """
+
+ # Get or create state manager for this specific UDAF function
+ state_manager = self._get_udaf_state_manager(python_udaf_meta)
+ # Define unified response schema (used for all responses)
+ unified_response_schema = pa.schema([pa.field("result_data",
pa.binary())])
+ started = False
+
+ for chunk in reader:
+ if not chunk.data or chunk.data.num_rows == 0:
+ logging.warning("Empty chunk received, skipping")
+ continue
+
+ batch = chunk.data
+
+ # Validate unified request schema
+ if batch.num_columns != 4:
+ raise ValueError(
+ f"Expected 4 columns in unified schema, got
{batch.num_columns}"
+ )
+
+ # Extract metadata from unified schema
+ operation_type = UDAFOperationType(batch.column(0)[0].as_py())
+ place_id = batch.column(1)[0].as_py()
+
+ # Check if metadata/data columns are null
+ metadata_col = batch.column(2)
+ data_col = batch.column(3)
+
+ metadata_binary = (
+ metadata_col[0].as_py() if metadata_col[0].is_valid else None
+ )
+ data_binary = data_col[0].as_py() if data_col[0].is_valid else None
+
+ logging.info(
+ "Processing UDAF operation: %s, place_id: %s",
+ operation_type.name,
+ place_id,
+ )
+
+ # Handle different operations - get operation-specific result
+ if operation_type == UDAFOperationType.CREATE:
Review Comment:
The operation_type needs to be checked each time here, as different batches
may have different operation_types
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]