HappenLee commented on code in PR #57868:
URL: https://github.com/apache/doris/pull/57868#discussion_r2554208264
##########
be/src/udf/python/python_udf_server.py:
##########
@@ -990,6 +1750,145 @@ def do_exchange(
started = True
writer.write_batch(result_batch)
+ def _do_exchange_udaf(
+ self,
+ python_udaf_meta: PythonUDFMeta,
+ reader: flight.MetadataRecordBatchReader,
+ writer: flight.MetadataRecordBatchWriter,
+ ) -> None:
+ """
+ Handle bidirectional streaming for UDAF execution.
+
+ Request Schema (unified for ALL operations):
+ - operation: int8 - UDAFOperationType enum value
+ - place_id: int64 - Unique identifier for the aggregate state
+ - metadata: binary - Serialized metadata (operation-specific)
+ - data: binary - Serialized data (operation-specific)
+
+ Response Schema (unified for ALL operations):
+ - result_data: binary - Serialized result RecordBatch in Arrow IPC
format
+ """
+
+ # Get or create state manager for this specific UDAF function
+ state_manager = self._get_udaf_state_manager(python_udaf_meta)
+ # Define unified response schema (used for all responses)
+ unified_response_schema = pa.schema([pa.field("result_data",
pa.binary())])
+ started = False
+
+ for chunk in reader:
+ if not chunk.data or chunk.data.num_rows == 0:
+ logging.warning("Empty chunk received, skipping")
+ continue
+
+ batch = chunk.data
+
+ # Validate unified request schema
+ if batch.num_columns != 4:
+ raise ValueError(
+ f"Expected 4 columns in unified schema, got
{batch.num_columns}"
+ )
+
+ # Extract metadata from unified schema
+ operation_type = UDAFOperationType(batch.column(0)[0].as_py())
+ place_id = batch.column(1)[0].as_py()
+
+ # Check if metadata/data columns are null
+ metadata_col = batch.column(2)
+ data_col = batch.column(3)
+
+ metadata_binary = (
+ metadata_col[0].as_py() if metadata_col[0].is_valid else None
+ )
+ data_binary = data_col[0].as_py() if data_col[0].is_valid else None
+
+ logging.info(
Review Comment:
too many log, del the normal log
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]