HappenLee commented on code in PR #58993:
URL: https://github.com/apache/doris/pull/58993#discussion_r2658776545
##########
be/src/udf/python/python_udaf_client.cpp:
##########
@@ -217,329 +110,419 @@ Status PythonUDAFClient::accumulate(int64_t place_id,
bool is_single_place,
row_end, input.num_rows());
}
- // If there are multiple aggregate functions, places array is required
- if (UNLIKELY(!is_single_place && places == nullptr)) {
+ // In multi-place mode, input RecordBatch must contain "places" column as
last column
+ if (UNLIKELY(!is_single_place &&
+ (input.num_columns() == 0 ||
+ input.schema()->field(input.num_columns() - 1)->name() !=
"places"))) {
return Status::InternalError(
- "places array must not be null when is_single_place=false
(GROUP BY aggregation)");
+ "In multi-place mode, input RecordBatch must contain 'places'
column as the "
+ "last column");
}
- // Build metadata RecordBatch
- // Schema: [is_single_place: bool, row_start: int64, row_end: int64,
place_offset: int64]
- arrow::BooleanBuilder single_place_builder;
- arrow::Int64Builder row_start_builder;
- arrow::Int64Builder row_end_builder;
- arrow::Int64Builder offset_builder;
-
- RETURN_DORIS_STATUS_IF_ERROR(single_place_builder.Append(is_single_place));
- RETURN_DORIS_STATUS_IF_ERROR(row_start_builder.Append(row_start));
- RETURN_DORIS_STATUS_IF_ERROR(row_end_builder.Append(row_end));
- RETURN_DORIS_STATUS_IF_ERROR(offset_builder.Append(place_offset));
-
- std::shared_ptr<arrow::Array> single_place_array;
- std::shared_ptr<arrow::Array> row_start_array;
- std::shared_ptr<arrow::Array> row_end_array;
- std::shared_ptr<arrow::Array> offset_array;
-
-
RETURN_DORIS_STATUS_IF_ERROR(single_place_builder.Finish(&single_place_array));
- RETURN_DORIS_STATUS_IF_ERROR(row_start_builder.Finish(&row_start_array));
- RETURN_DORIS_STATUS_IF_ERROR(row_end_builder.Finish(&row_end_array));
- RETURN_DORIS_STATUS_IF_ERROR(offset_builder.Finish(&offset_array));
+ // Create request batch: input data + NULL binary_data column
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_data_request_batch(input, &request_batch));
- auto metadata_batch = arrow::RecordBatch::Make(
- kAccumulateMetadataSchema, 1,
- {single_place_array, row_start_array, row_end_array,
offset_array});
+ // Create metadata structure
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::ACCUMULATE),
+ .place_id = place_id,
+ .is_single_place = static_cast<uint8_t>(is_single_place ? 1 : 0),
+ .row_start = row_start,
+ .row_end = row_end,
+ .place_offset = 0,
+ };
- // Serialize metadata
- std::shared_ptr<arrow::Buffer> metadata_buffer;
- RETURN_IF_ERROR(serialize_record_batch(*metadata_batch, &metadata_buffer));
-
- // Build data RecordBatch (input columns + optional places array)
- std::vector<std::shared_ptr<arrow::Field>> data_fields;
- std::vector<std::shared_ptr<arrow::Array>> data_arrays;
-
- // Add input columns
- for (int i = 0; i < input.num_columns(); ++i) {
- data_fields.push_back(input.schema()->field(i));
- data_arrays.push_back(input.column(i));
- }
-
- // Add places array if in multi-place mode
- if (!is_single_place) {
- arrow::Int64Builder places_builder;
- for (int64_t i = 0; i < input.num_rows(); ++i) {
- RETURN_DORIS_STATUS_IF_ERROR(places_builder.Append(places[i]));
- }
- std::shared_ptr<arrow::Array> places_array;
- RETURN_DORIS_STATUS_IF_ERROR(places_builder.Finish(&places_array));
+ // Send to server with metadata in app_metadata
+ std::shared_ptr<arrow::RecordBatch> response;
+ RETURN_IF_ERROR(_send_request(metadata, request_batch, &response));
- data_fields.push_back(arrow::field("places", arrow::int64()));
- data_arrays.push_back(places_array);
+ // Parse unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ if (response->num_rows() != 1) {
+ return Status::InternalError("Invalid ACCUMULATE response: expected 1
row");
}
- auto data_schema = arrow::schema(data_fields);
- auto data_batch = arrow::RecordBatch::Make(data_schema, input.num_rows(),
data_arrays);
-
- // Serialize data
- std::shared_ptr<arrow::Buffer> data_buffer;
- RETURN_IF_ERROR(serialize_record_batch(*data_batch, &data_buffer));
-
- // Create unified batch
- std::shared_ptr<arrow::RecordBatch> batch;
- RETURN_IF_ERROR(create_unified_batch(UDAFOperation::ACCUMULATE, place_id,
metadata_buffer,
- data_buffer, &batch));
-
- // Send to server and check rows_processed
- std::shared_ptr<arrow::RecordBatch> output;
- RETURN_IF_ERROR(_send_operation(batch.get(), &output));
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
+ auto rows_processed_array =
std::static_pointer_cast<arrow::Int64Array>(response->column(1));
- // Validate response: [rows_processed: int64]
- if (output->num_columns() != 1 || output->num_rows() != 1) {
- return Status::InternalError("Invalid ACCUMULATE response from Python
UDAF server");
+ if (!success_array->Value(0)) {
+ return Status::InternalError("ACCUMULATE operation failed for
place_id={}", place_id);
}
- std::shared_ptr<arrow::Int64Array> int64_array;
- RETURN_IF_ERROR(validate_and_cast_column(output, "ACCUMULATE",
&int64_array));
-
- int64_t rows_processed = int64_array->Value(0);
+ int64_t rows_processed = rows_processed_array->Value(0);
int64_t expected_rows = row_end - row_start;
if (rows_processed < expected_rows) {
return Status::InternalError(
"ACCUMULATE operation only processed {} out of {} rows for
place_id={}",
rows_processed, expected_rows, place_id);
}
-
return Status::OK();
}
Status PythonUDAFClient::serialize(int64_t place_id,
std::shared_ptr<arrow::Buffer>*
serialized_state) {
- // Execute SERIALIZE operation
- std::shared_ptr<arrow::RecordBatch> output;
- RETURN_IF_ERROR((_execute_operation<UDAFOperation::SERIALIZE,
false>(place_id, nullptr, nullptr,
-
&output)));
+ RETURN_IF_ERROR(check_process_alive());
- // Extract serialized state from output (should be a binary column)
- if (output->num_columns() != 1 || output->num_rows() != 1) {
- return Status::InternalError("Invalid SERIALIZE response from Python
UDAF server");
- }
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_empty_request_batch(&request_batch));
+
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::SERIALIZE),
+ .place_id = place_id,
+ .is_single_place = 0,
+ .row_start = 0,
+ .row_end = 0,
+ .place_offset = 0,
+ };
+
+ std::shared_ptr<arrow::RecordBatch> response;
+ RETURN_IF_ERROR(_send_request(metadata, request_batch, &response));
- std::shared_ptr<arrow::BinaryArray> binary_array;
- RETURN_IF_ERROR(validate_and_cast_column(output, "SERIALIZE",
&binary_array));
+ // Parse unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
+ auto data_array =
std::static_pointer_cast<arrow::BinaryArray>(response->column(2));
+
+ if (!success_array->Value(0)) {
+ return Status::InternalError("SERIALIZE operation failed for
place_id={}", place_id);
+ }
int32_t length;
- const uint8_t* data = binary_array->GetValue(0, &length);
+ const uint8_t* data = data_array->GetValue(0, &length);
- // Check if serialization succeeded (non-empty binary)
if (length == 0) {
- return Status::InternalError("SERIALIZE operation failed for
place_id={}", place_id);
+ return Status::InternalError("SERIALIZE operation returned empty data
for place_id={}",
+ place_id);
}
*serialized_state = arrow::Buffer::Wrap(data, length);
-
return Status::OK();
}
Status PythonUDAFClient::merge(int64_t place_id,
const std::shared_ptr<arrow::Buffer>&
serialized_state) {
- return _execute_operation<UDAFOperation::MERGE, true>(place_id, nullptr,
serialized_state,
- nullptr);
-}
+ RETURN_IF_ERROR(check_process_alive());
-Status PythonUDAFClient::finalize(int64_t place_id,
std::shared_ptr<arrow::RecordBatch>* output) {
- // Execute FINALIZE operation
- RETURN_IF_ERROR((_execute_operation<UDAFOperation::FINALIZE,
false>(place_id, nullptr, nullptr,
-
output)));
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_binary_request_batch(serialized_state,
&request_batch));
- // Validate basic response structure (but allow NULL values)
- if (!output || !(*output) || (*output)->num_columns() != 1 ||
(*output)->num_rows() != 1) {
- return Status::InternalError(
- "Invalid FINALIZE response: expected 1 column and 1 row, got
{} columns and {} "
- "rows",
- output && (*output) ? (*output)->num_columns() : 0,
- output && (*output) ? (*output)->num_rows() : 0);
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::MERGE),
+ .place_id = place_id,
+ .is_single_place = 0,
+ .row_start = 0,
+ .row_end = 0,
+ .place_offset = 0,
+ };
+
+ std::shared_ptr<arrow::RecordBatch> response;
+ RETURN_IF_ERROR(_send_request(metadata, request_batch, &response));
+
+ // Parse unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ if (response->num_rows() != 1) {
+ return Status::InternalError("Invalid MERGE response: expected 1 row");
+ }
+
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
+ if (!success_array->Value(0)) {
+ return Status::InternalError("MERGE operation failed for place_id={}",
place_id);
}
return Status::OK();
}
-Status PythonUDAFClient::reset(int64_t place_id) {
- return _execute_operation<UDAFOperation::RESET, true>(place_id, nullptr,
nullptr, nullptr);
-}
+Status PythonUDAFClient::finalize(int64_t place_id,
std::shared_ptr<arrow::RecordBatch>* output) {
+ RETURN_IF_ERROR(check_process_alive());
-Status PythonUDAFClient::destroy(int64_t place_id) {
- if (UNLIKELY(!_process->is_alive())) {
- return Status::RuntimeError("{} process is not alive",
_operation_name);
- }
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_empty_request_batch(&request_batch));
+
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::FINALIZE),
+ .place_id = place_id,
+ .is_single_place = 0,
+ .row_start = 0,
+ .row_end = 0,
+ .place_offset = 0,
+ };
- // Create unified batch for DESTROY operation
- std::shared_ptr<arrow::RecordBatch> batch;
- RETURN_IF_ERROR(
- create_unified_batch(UDAFOperation::DESTROY, place_id, nullptr,
nullptr, &batch));
+ std::shared_ptr<arrow::RecordBatch> response_batch;
+ RETURN_IF_ERROR(_send_request(metadata, request_batch, &response_batch));
- // Send to server and check response
- std::shared_ptr<arrow::RecordBatch> output;
- RETURN_IF_ERROR(_send_operation(batch.get(), &output));
+ // Parse unified response_batch: [success: bool, rows_processed: int64,
serialized_data: binary]
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response_batch->column(0));
+ auto data_array =
std::static_pointer_cast<arrow::BinaryArray>(response_batch->column(2));
- // Validate response: [success: bool]
- if (output->num_columns() != 1 || output->num_rows() != 1) {
- return Status::InternalError("Invalid DESTROY response from Python
UDAF server");
+ if (!success_array->Value(0)) {
+ return Status::InternalError("FINALIZE operation failed for
place_id={}", place_id);
}
- std::shared_ptr<arrow::BooleanArray> bool_array;
- RETURN_IF_ERROR(validate_and_cast_column(output, "DESTROY", &bool_array));
+ // Deserialize data column to get actual result
+ int32_t length;
+ const uint8_t* data = data_array->GetValue(0, &length);
- if (!bool_array->Value(0)) {
- return Status::InternalError("DESTROY operation failed for
place_id={}", place_id);
+ if (length == 0) {
+ return Status::InternalError("FINALIZE operation returned empty data
for place_id={}",
+ place_id);
}
- _created_states.erase(place_id);
- return Status::OK();
-}
+ auto buffer = arrow::Buffer::Wrap(data, length);
+ auto input_stream = std::make_shared<arrow::io::BufferReader>(buffer);
+
+ auto reader_result =
arrow::ipc::RecordBatchStreamReader::Open(input_stream);
+ if (UNLIKELY(!reader_result.ok())) {
+ return Status::InternalError("Failed to deserialize FINALIZE result:
{}",
+ reader_result.status().message());
+ }
+ auto reader = std::move(reader_result).ValueOrDie();
-Status PythonUDAFClient::destroy_all() {
- // Destroy all tracked states
- for (int64_t place_id : _created_states) {
- // Ignore errors during cleanup
- static_cast<void>(destroy(place_id));
+ auto batch_result = reader->Next();
+ if (UNLIKELY(!batch_result.ok())) {
+ return Status::InternalError("Failed to read FINALIZE result: {}",
+ batch_result.status().message());
}
- _created_states.clear();
+
+ *output = std::move(batch_result).ValueOrDie();
+
return Status::OK();
}
-Status PythonUDAFClient::close() {
- if (!_inited || !_writer) return Status::OK();
+Status PythonUDAFClient::reset(int64_t place_id) {
+ RETURN_IF_ERROR(check_process_alive());
- // Destroy all remaining states
- RETURN_IF_ERROR(destroy_all());
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_empty_request_batch(&request_batch));
- // Call base class close which handles cleanup
- return PythonClient::close();
-}
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::RESET),
+ .place_id = place_id,
+ .is_single_place = 0,
+ .row_start = 0,
+ .row_end = 0,
+ .place_offset = 0,
+ };
-std::string PythonUDAFClient::print_operation(UDAFOperation op) {
- switch (op) {
- case UDAFOperation::CREATE:
- return "CREATE";
- case UDAFOperation::ACCUMULATE:
- return "ACCUMULATE";
- case UDAFOperation::SERIALIZE:
- return "SERIALIZE";
- case UDAFOperation::MERGE:
- return "MERGE";
- case UDAFOperation::FINALIZE:
- return "FINALIZE";
- case UDAFOperation::RESET:
- return "RESET";
- case UDAFOperation::DESTROY:
- return "DESTROY";
- default:
- return "UNKNOWN";
+ std::shared_ptr<arrow::RecordBatch> response;
+ RETURN_IF_ERROR(_send_request(metadata, request_batch, &response));
+
+ // Parse unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ if (response->num_rows() != 1) {
+ return Status::InternalError("Invalid RESET response: expected 1 row");
}
-}
-template <PythonUDAFClient::UDAFOperation operation, bool validate_response>
-Status PythonUDAFClient::_execute_operation(int64_t place_id,
- const
std::shared_ptr<arrow::Buffer>& metadata,
- const
std::shared_ptr<arrow::Buffer>& data,
-
std::shared_ptr<arrow::RecordBatch>* output) {
- if (UNLIKELY(!_process->is_alive())) {
- return Status::RuntimeError("{} process is not alive",
_operation_name);
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
+ if (!success_array->Value(0)) {
+ return Status::InternalError("RESET operation failed for place_id={}",
place_id);
}
- // Create unified batch for the operation
- std::shared_ptr<arrow::RecordBatch> batch;
- RETURN_IF_ERROR(create_unified_batch(operation, place_id, metadata, data,
&batch));
+ return Status::OK();
+}
- // Send to server
- std::shared_ptr<arrow::RecordBatch> result;
- RETURN_IF_ERROR(_send_operation(batch.get(), &result));
+Status PythonUDAFClient::destroy(int64_t place_id) {
+ RETURN_IF_ERROR(check_process_alive());
- // Validate response if requested (compile-time branch)
- if constexpr (validate_response) {
- if (result->num_columns() != 1 || result->num_rows() != 1) {
- return Status::InternalError("Invalid {} response from Python UDAF
server",
- print_operation(operation));
- }
+ std::shared_ptr<arrow::RecordBatch> request_batch;
+ RETURN_IF_ERROR(_create_empty_request_batch(&request_batch));
- std::shared_ptr<arrow::BooleanArray> bool_array;
- RETURN_IF_ERROR(validate_and_cast_column(result,
print_operation(operation), &bool_array));
+ UDAFMetadata metadata {
+ .operation = static_cast<uint8_t>(UDAFOperation::DESTROY),
+ .place_id = place_id,
+ .is_single_place = 0,
+ .row_start = 0,
+ .row_end = 0,
+ .place_offset = 0,
+ };
- if (!bool_array->Value(0)) {
- return Status::InternalError("{} operation failed for place_id={}",
- print_operation(operation), place_id);
- }
+ std::shared_ptr<arrow::RecordBatch> response;
+ Status st = _send_request(metadata, request_batch, &response);
+
+ // Always clear tracking, even if RPC failed
+ _created_place_id.reset();
+
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to destroy place_id=" << place_id << ": " <<
st.to_string();
+ return st;
}
- // Set output if provided
- if (output != nullptr) {
- *output = std::move(result);
+ // Parse unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ if (response->num_rows() != 1) {
+ return Status::InternalError("Invalid DESTROY response: expected 1
row");
+ }
+
+ auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
+
+ if (!success_array->Value(0)) {
+ LOG(WARNING) << "DESTROY operation failed for place_id=" << place_id;
+ return Status::InternalError("DESTROY operation failed for
place_id={}", place_id);
}
return Status::OK();
}
-Status PythonUDAFClient::_send_operation(const arrow::RecordBatch* input,
- std::shared_ptr<arrow::RecordBatch>*
output) {
- // CRITICAL: Lock here to protect Arrow Flight RPC operations (write/read)
- // Arrow Flight Client does NOT support concurrent read/write operations
+Status PythonUDAFClient::close() {
+ if (!_inited || !_writer) return Status::OK();
+
+ // Destroy the place if it exists (cleanup on client destruction)
+ if (_created_place_id.has_value()) {
+ int64_t place_id = _created_place_id.value();
+ Status st = destroy(place_id);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to destroy place_id=" << place_id
+ << " during close: " << st.to_string();
+ // Clear tracking even on failure to prevent issues in base class
close
+ _created_place_id.reset();
+ }
+ }
+
+ return PythonClient::close();
+}
+
+Status PythonUDAFClient::_send_request(const UDAFMetadata& metadata,
+ const
std::shared_ptr<arrow::RecordBatch>& request_batch,
+ std::shared_ptr<arrow::RecordBatch>*
response_batch) {
+ DCHECK(response_batch != nullptr);
+
+ // Create app_metadata buffer from metadata struct
+ auto app_metadata =
+ arrow::Buffer::Wrap(reinterpret_cast<const uint8_t*>(&metadata),
sizeof(metadata));
+
std::lock_guard<std::mutex> lock(_operation_mutex);
- // Step 1: Begin exchange with unified schema (only once, now protected by
mutex)
+ // Check if writer/reader are still valid (they could be reset by
handle_error)
+ if (UNLIKELY(!_writer || !_reader)) {
+ return Status::InternalError("{} writer/reader have been closed due to
previous error",
+ _operation_name);
+ }
+
+ // Begin stream on first call (using data schema: argument_types + places
+ binary_data)
if (UNLIKELY(!_begin)) {
- // Always use the unified schema for all operations
- auto begin_res = _writer->Begin(kUnifiedUDAFSchema);
+ auto begin_res = _writer->Begin(_schema);
if (!begin_res.ok()) {
return handle_error(begin_res);
}
_begin = true;
}
- // Step 2: Write the record batch to server
- auto write_res = _writer->WriteRecordBatch(*input);
+ // Write batch with metadata in app_metadata
+ auto write_res = _writer->WriteWithMetadata(*request_batch, app_metadata);
if (!write_res.ok()) {
return handle_error(write_res);
}
- // Step 3: Read response from server (if output is expected)
- if (output != nullptr) {
- auto read_res = _reader->Next();
- if (!read_res.ok()) {
- return handle_error(read_res.status());
- }
+ // Read unified response: [success: bool, rows_processed: int64,
serialized_data: binary]
+ auto read_res = _reader->Next();
+ if (!read_res.ok()) {
+ return handle_error(read_res.status());
+ }
- arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
- if (!chunk.data) {
- _process->shutdown();
- return Status::InternalError("Received empty RecordBatch from {}
server",
- _operation_name);
- }
+ arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
+ if (!chunk.data) {
+ return Status::InternalError("Received empty RecordBatch from {}
server", _operation_name);
+ }
- // The response is in unified format: [result_data: binary]
- // Extract and deserialize the actual result
- auto unified_response = chunk.data;
- if (unified_response->num_columns() != 1 ||
unified_response->num_rows() != 1) {
- return Status::InternalError(
- "Invalid unified response format: expected 1 column and 1
row, got {} columns "
- "and {} rows",
- unified_response->num_columns(),
unified_response->num_rows());
- }
+ // Validate unified response schema
+ if (!chunk.data->schema()->Equals(kUnifiedUDAFResponseSchema)) {
+ return Status::InternalError(
+ "Invalid response schema: expected [success: bool,
rows_processed: int64, "
+ "serialized_data: binary], got {}",
+ chunk.data->schema()->ToString());
+ }
- std::shared_ptr<arrow::BinaryArray> binary_array;
- RETURN_IF_ERROR(
- validate_and_cast_column(unified_response, "UNIFIED_RESPONSE",
&binary_array));
+ *response_batch = std::move(chunk.data);
+ return Status::OK();
+}
+
+Status PythonUDAFClient::_create_data_request_batch(const arrow::RecordBatch&
input_data,
+
std::shared_ptr<arrow::RecordBatch>* out) {
+ // Determine if input has places column
+ int num_input_columns = input_data.num_columns();
+ bool has_places = false;
+ if (num_input_columns > 0 &&
+ input_data.schema()->field(num_input_columns - 1)->name() == "places")
{
+ has_places = true;
+ }
- int32_t length;
- const uint8_t* data = binary_array->GetValue(0, &length);
+ // Expected schema structure: [argument_types..., places, binary_data]
+ // - Input in single-place mode: [argument_types...]
+ // - Input in multi-place mode: [argument_types..., places]
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ // Copy argument_types columns
+ int num_arg_columns = has_places ? (num_input_columns - 1) :
num_input_columns;
- if (length == 0) {
- return Status::InternalError("Received empty result_data from
Python UDAF server");
- }
+ for (int i = 0; i < num_arg_columns; ++i) {
+ columns.push_back(input_data.column(i));
+ }
- auto result_buffer = arrow::Buffer::Wrap(data, length);
- RETURN_IF_ERROR(deserialize_record_batch(result_buffer, output));
+ // Add places column
+ if (has_places) {
+ // Use existing places column from input
+ columns.push_back(input_data.column(num_input_columns - 1));
+ } else {
+ // Create NULL places column for single-place mode
+ arrow::Int64Builder places_builder;
+ std::shared_ptr<arrow::Array> places_array;
+
RETURN_DORIS_STATUS_IF_ERROR(places_builder.AppendNulls(input_data.num_rows()));
+ RETURN_DORIS_STATUS_IF_ERROR(places_builder.Finish(&places_array));
+ columns.push_back(places_array);
+ }
+
+ // Add NULL binary_data column
+ arrow::BinaryBuilder binary_builder;
+ std::shared_ptr<arrow::Array> binary_array;
+
RETURN_DORIS_STATUS_IF_ERROR(binary_builder.AppendNulls(input_data.num_rows()));
+ RETURN_DORIS_STATUS_IF_ERROR(binary_builder.Finish(&binary_array));
+ columns.push_back(binary_array);
+
+ *out = arrow::RecordBatch::Make(_schema, input_data.num_rows(), columns);
+ return Status::OK();
+}
+
+Status PythonUDAFClient::_create_binary_request_batch(
+ const std::shared_ptr<arrow::Buffer>& binary_data,
+ std::shared_ptr<arrow::RecordBatch>* out) {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+
+ // Create NULL arrays for data columns (all columns except the last
binary_data column)
+ // Schema: [argument_types..., places, binary_data]
+ int num_data_columns = _schema->num_fields() - 1;
+ for (int i = 0; i < num_data_columns; ++i) {
+ std::unique_ptr<arrow::ArrayBuilder> builder;
+ std::shared_ptr<arrow::Array> null_array;
+
RETURN_DORIS_STATUS_IF_ERROR(arrow::MakeBuilder(arrow::default_memory_pool(),
+
_schema->field(i)->type(), &builder));
+ RETURN_DORIS_STATUS_IF_ERROR(builder->AppendNull());
+ RETURN_DORIS_STATUS_IF_ERROR(builder->Finish(&null_array));
+ columns.push_back(null_array);
+ }
+
+ // Create binary_data column
+ arrow::BinaryBuilder binary_builder;
+ std::shared_ptr<arrow::Array> binary_array;
+ RETURN_DORIS_STATUS_IF_ERROR(
+ binary_builder.Append(binary_data->data(),
static_cast<int32_t>(binary_data->size())));
+ RETURN_DORIS_STATUS_IF_ERROR(binary_builder.Finish(&binary_array));
+ columns.push_back(binary_array);
+
+ *out = arrow::RecordBatch::Make(_schema, 1, columns);
+ return Status::OK();
+}
+
+Status
PythonUDAFClient::_create_empty_request_batch(std::shared_ptr<arrow::RecordBatch>*
out) {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
Review Comment:
maybe only need create one time, after create, all op maybe use the same
batch. because the batch seems unless?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]