morningman commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r800060844



##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), 
_fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, 
ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = 
AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        
arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 
0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = 
state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, 
_rpc_function_symbol));

Review comment:
       `hdfs_location`?

##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), 
_fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, 
ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = 
AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        
arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 
0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = 
state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, 
_rpc_function_symbol));
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_rpc_function_symbol.c_str(), 
_rpc_function_symbol.size(), 21217891,
+                        &name_hash);
+    request.set_name_hash(name_hash);
+    for (int i = 0; i < _children.size(); ++i) {
+        PValues* arg = request.add_args();
+        void* src_slot = context->get_value(_children[i], row);
+        PGenericType* ptype = arg->mutable_type();
+        if (src_slot == nullptr) {
+            arg->set_has_null(true);
+            arg->add_null_map(true);
+        } else {
+            arg->set_has_null(false);
+        }
+        switch (_children[i]->type().type) {
+        case TYPE_BOOLEAN: {
+            ptype->set_id(PGenericType::BOOLEAN);
+            arg->add_bool_value(*(bool*)src_slot);
+            break;
+        }
+        case TYPE_TINYINT: {
+            ptype->set_id(PGenericType::INT8);
+            arg->add_int32_value(*(int8_t*)src_slot);
+            break;
+        }
+        case TYPE_SMALLINT: {
+            ptype->set_id(PGenericType::INT16);
+            arg->add_int32_value(*(int16_t*)src_slot);
+            break;
+        }
+        case TYPE_INT: {
+            ptype->set_id(PGenericType::INT32);
+            arg->add_int32_value(*(int*)src_slot);
+            break;
+        }
+        case TYPE_BIGINT: {
+            ptype->set_id(PGenericType::INT64);
+            arg->add_int64_value(*(int64_t*)src_slot);
+            break;
+        }
+        case TYPE_LARGEINT: {
+            ptype->set_id(PGenericType::INT128);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DOUBLE: {
+            ptype->set_id(PGenericType::DOUBLE);
+            arg->add_double_value(*(double*)src_slot);
+            break;
+        }
+        case TYPE_FLOAT: {
+            ptype->set_id(PGenericType::FLOAT);
+            arg->add_float_value(*(float*)src_slot);
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_STRING:
+        case TYPE_CHAR: {
+            ptype->set_id(PGenericType::STRING);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_HLL: {
+            ptype->set_id(PGenericType::HLL);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_OBJECT: {
+            ptype->set_id(PGenericType::BITMAP);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_DECIMALV2: {
+            ptype->set_id(PGenericType::DECIMAL128);
+            
ptype->mutable_decimal_type()->set_precision(_children[i]->type().precision);
+            
ptype->mutable_decimal_type()->set_scale(_children[i]->type().scale);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DATE: {
+            ptype->set_id(PGenericType::DATE);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            break;
+        }
+        case TYPE_DATETIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        case TYPE_TIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        default: {
+            FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+            fn_ctx->set_error(
+                    fmt::format("data time not supported: {}", 
_children[i]->type().type).c_str());
+            break;
+        }
+        }
+    }
+
+    brpc::Controller cntl;
+    _client->fn_call(&cntl, &request, response, nullptr);
+    if (cntl.Failed()) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(cntl.ErrorText().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} 
failed: {}",
+                                                 _rpc_function_symbol, 
cntl.ErrorText())
+                                             .c_str());
+    }
+    if (response->status().status_code() != 0) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(response->status().DebugString().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} 
failed: {}",
+                                                 _rpc_function_symbol,
+                                                 
response->status().DebugString()));
+    }
+    return Status::OK();
+}
+
+template <typename T>
+T RPCFnCall::interpret_eval(ExprContext* context, TupleRow* row) {
+    T res_val;
+    PFunctionCallResponse response;
+    Status st = _eval_children(context, row, &response);
+    if (!st.ok() || response.status().status_code() != 0 ||
+        (response.result().has_null() && response.result().null_map(0))) {
+        res_val.is_null = true;

Review comment:
       Is it ok to return null if RPC failed?

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {
+    enum TypeId {
+        UINT8 = 0;
+        UINT16 = 1;
+        UINT32 = 2;
+        UINT64 = 3;
+        UINT128 = 4;
+        UINT256 = 5;
+        INT8 = 6;
+        INT16 = 7;
+        INT32 = 8;
+        INT64 = 9;
+        INT128 = 10;
+        INT256 = 11;
+        FLOAT = 12;
+        DOUBLE = 13;
+        BOOLEAN = 14;
+        DATE = 15;
+        DATETIME = 16;
+        HLL = 17;
+        BITMAP = 18;
+        LIST = 19;
+        MAP = 20;
+        STRUCT =21;
+        STRING = 22;
+        DECIMAL32 = 23;
+        DECIMAL64 = 24;
+        DECIMAL128 = 25;
+        BYTES = 26;
+        NOTHING = 27;
+        UNKNOWN = 999;
+    }
+    required TypeId id = 2;
+    optional PList list_type = 11;
+    optional PMap map_type = 12;
+    optional PStruct struct_type = 13;
+    optional PDecimal decimal_type = 14;
+}
+
+message PList {
+  required PGenericType element_type = 1;
+}
+
+message PMap {
+  required PGenericType key_type = 1;
+  required PGenericType value_type = 2;
+}
+
+message PField {
+  required PGenericType type = 1;
+  optional string name = 2;
+  optional string comment = 3;
+}
+
+message PStruct {
+  repeated PField fields = 1;
+  required string name = 2;
+}
+
+message PDecimal {
+  required uint32 precision = 1;
+  required uint32 scale = 2;
+}
+
+message PDateTime {
+    optional int32 year = 1;
+    optional int32 month = 2;
+    optional int32 day = 3;
+    optional int32 hour = 4;
+    optional int32 minute = 5;
+    optional int32 second = 6;
+    optional int32 microsecond = 7;
+}
+
+message PValue {
+    required PGenericType type = 1;
+    optional double double_value = 2;
+    optional float float_value = 3;
+    optional int32 int32_value = 4;
+    optional int64 int64_value = 5;
+    optional uint32 uint32_value = 6;
+    optional uint64 uint64_value = 7;
+    optional bool bool_value = 8;
+    optional string string_value = 9;
+    optional bytes bytes_value = 10;
+    optional PDateTime datetime_value = 11;
+    optional bool is_null  = 12 [default = false];
+}
+
+message PValues {
+    required PGenericType type = 1;
+    repeated double double_value = 2;
+    repeated float float_value = 3;
+    repeated int32 int32_value = 4;
+    repeated int64 int64_value = 5;
+    repeated uint32 uint32_value = 6;
+    repeated uint64 uint64_value = 7;
+    repeated bool bool_value = 8;
+    repeated string string_value = 9;
+    repeated bytes bytes_value = 10;
+    repeated PDateTime datetime_value = 11;
+    repeated bool null_map = 12;

Review comment:
       move `null_map` and `has_null` to the second and third place?

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {

Review comment:
       This is a same datatype define in `data.proto`, how about unify them?
   I will do this in PR #7939 

##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), 
_fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, 
ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = 
AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        
arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 
0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = 
state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, 
_rpc_function_symbol));
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_rpc_function_symbol.c_str(), 
_rpc_function_symbol.size(), 21217891,
+                        &name_hash);
+    request.set_name_hash(name_hash);
+    for (int i = 0; i < _children.size(); ++i) {
+        PValues* arg = request.add_args();
+        void* src_slot = context->get_value(_children[i], row);
+        PGenericType* ptype = arg->mutable_type();
+        if (src_slot == nullptr) {
+            arg->set_has_null(true);
+            arg->add_null_map(true);
+        } else {
+            arg->set_has_null(false);
+        }
+        switch (_children[i]->type().type) {
+        case TYPE_BOOLEAN: {
+            ptype->set_id(PGenericType::BOOLEAN);
+            arg->add_bool_value(*(bool*)src_slot);
+            break;
+        }
+        case TYPE_TINYINT: {
+            ptype->set_id(PGenericType::INT8);
+            arg->add_int32_value(*(int8_t*)src_slot);
+            break;
+        }
+        case TYPE_SMALLINT: {
+            ptype->set_id(PGenericType::INT16);
+            arg->add_int32_value(*(int16_t*)src_slot);
+            break;
+        }
+        case TYPE_INT: {
+            ptype->set_id(PGenericType::INT32);
+            arg->add_int32_value(*(int*)src_slot);
+            break;
+        }
+        case TYPE_BIGINT: {
+            ptype->set_id(PGenericType::INT64);
+            arg->add_int64_value(*(int64_t*)src_slot);
+            break;
+        }
+        case TYPE_LARGEINT: {
+            ptype->set_id(PGenericType::INT128);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DOUBLE: {
+            ptype->set_id(PGenericType::DOUBLE);
+            arg->add_double_value(*(double*)src_slot);
+            break;
+        }
+        case TYPE_FLOAT: {
+            ptype->set_id(PGenericType::FLOAT);
+            arg->add_float_value(*(float*)src_slot);
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_STRING:
+        case TYPE_CHAR: {
+            ptype->set_id(PGenericType::STRING);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_HLL: {
+            ptype->set_id(PGenericType::HLL);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_OBJECT: {
+            ptype->set_id(PGenericType::BITMAP);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_DECIMALV2: {
+            ptype->set_id(PGenericType::DECIMAL128);
+            
ptype->mutable_decimal_type()->set_precision(_children[i]->type().precision);
+            
ptype->mutable_decimal_type()->set_scale(_children[i]->type().scale);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DATE: {
+            ptype->set_id(PGenericType::DATE);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            break;
+        }
+        case TYPE_DATETIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        case TYPE_TIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        default: {
+            FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+            fn_ctx->set_error(
+                    fmt::format("data time not supported: {}", 
_children[i]->type().type).c_str());
+            break;
+        }
+        }
+    }
+
+    brpc::Controller cntl;
+    _client->fn_call(&cntl, &request, response, nullptr);
+    if (cntl.Failed()) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(cntl.ErrorText().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} 
failed: {}",
+                                                 _rpc_function_symbol, 
cntl.ErrorText())
+                                             .c_str());
+    }
+    if (response->status().status_code() != 0) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(response->status().DebugString().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} 
failed: {}",
+                                                 _rpc_function_symbol,
+                                                 
response->status().DebugString()));
+    }
+    return Status::OK();
+}
+
+template <typename T>
+T RPCFnCall::interpret_eval(ExprContext* context, TupleRow* row) {
+    T res_val;
+    PFunctionCallResponse response;
+    Status st = _eval_children(context, row, &response);
+    if (!st.ok() || response.status().status_code() != 0 ||

Review comment:
       The situation of `response.status().status_code() != 0` has been checked 
in `_eval_children()`.
   No need to check again here.

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {
+    enum TypeId {
+        UINT8 = 0;
+        UINT16 = 1;
+        UINT32 = 2;
+        UINT64 = 3;
+        UINT128 = 4;
+        UINT256 = 5;
+        INT8 = 6;
+        INT16 = 7;
+        INT32 = 8;
+        INT64 = 9;
+        INT128 = 10;
+        INT256 = 11;
+        FLOAT = 12;
+        DOUBLE = 13;
+        BOOLEAN = 14;
+        DATE = 15;
+        DATETIME = 16;
+        HLL = 17;
+        BITMAP = 18;
+        LIST = 19;
+        MAP = 20;
+        STRUCT =21;
+        STRING = 22;
+        DECIMAL32 = 23;
+        DECIMAL64 = 24;
+        DECIMAL128 = 25;
+        BYTES = 26;
+        NOTHING = 27;
+        UNKNOWN = 999;
+    }
+    required TypeId id = 2;
+    optional PList list_type = 11;
+    optional PMap map_type = 12;
+    optional PStruct struct_type = 13;
+    optional PDecimal decimal_type = 14;
+}
+
+message PList {
+  required PGenericType element_type = 1;
+}
+
+message PMap {
+  required PGenericType key_type = 1;
+  required PGenericType value_type = 2;
+}
+
+message PField {
+  required PGenericType type = 1;
+  optional string name = 2;
+  optional string comment = 3;
+}
+
+message PStruct {
+  repeated PField fields = 1;
+  required string name = 2;
+}
+
+message PDecimal {
+  required uint32 precision = 1;
+  required uint32 scale = 2;
+}
+
+message PDateTime {
+    optional int32 year = 1;
+    optional int32 month = 2;
+    optional int32 day = 3;
+    optional int32 hour = 4;
+    optional int32 minute = 5;
+    optional int32 second = 6;
+    optional int32 microsecond = 7;
+}
+
+message PValue {
+    required PGenericType type = 1;
+    optional double double_value = 2;
+    optional float float_value = 3;
+    optional int32 int32_value = 4;
+    optional int64 int64_value = 5;
+    optional uint32 uint32_value = 6;
+    optional uint64 uint64_value = 7;
+    optional bool bool_value = 8;
+    optional string string_value = 9;
+    optional bytes bytes_value = 10;
+    optional PDateTime datetime_value = 11;
+    optional bool is_null  = 12 [default = false];

Review comment:
       better to move `is_null` to the second place? (order 2)

##########
File path: be/src/vec/functions/function_rpc.cpp
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/functions/function_rpc.h"
+
+#include <fmt/format.h>
+
+#include <memory>
+
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/exec_env.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::vectorized {
+RPCFnCall::RPCFnCall(const std::string& symbol, const std::string& server,
+                     const DataTypes& argument_types, const DataTypePtr& 
return_type)
+        : _symbol(symbol),
+          _server(server),
+          _name(fmt::format("{}/{}", server, symbol)),
+          _argument_types(argument_types),
+          _return_type(return_type) {}
+Status RPCFnCall::prepare(FunctionContext* context, 
FunctionContext::FunctionStateScope scope) {
+    _client = 
ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server);
+
+    if (_client == nullptr) {
+        return Status::InternalError("rpc env init error");
+    }
+    return Status::OK();
+}
+
+template <bool nullable>
+void convert_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& 
data_type, PValues* arg,
+                           size_t row_count) {
+    PGenericType* ptype = arg->mutable_type();
+    switch (data_type->get_type_id()) {
+    case TypeIndex::UInt8: {
+        ptype->set_id(PGenericType::UINT8);
+        auto* values = arg->mutable_bool_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt16: {
+        ptype->set_id(PGenericType::UINT16);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt32: {
+        ptype->set_id(PGenericType::UINT32);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt64: {
+        ptype->set_id(PGenericType::UINT64);
+        auto* values = arg->mutable_uint64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt128: {
+        ptype->set_id(PGenericType::UINT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Int8: {
+        ptype->set_id(PGenericType::INT8);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int16: {
+        ptype->set_id(PGenericType::INT16);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int32: {
+        ptype->set_id(PGenericType::INT32);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int64: {
+        ptype->set_id(PGenericType::INT64);
+        auto* values = arg->mutable_int64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int128: {
+        ptype->set_id(PGenericType::INT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Float32: {
+        ptype->set_id(PGenericType::FLOAT);
+        auto* values = arg->mutable_float_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+
+    case TypeIndex::Float64: {
+        ptype->set_id(PGenericType::DOUBLE);
+        auto* values = arg->mutable_double_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Decimal128: {
+        ptype->set_id(PGenericType::DECIMAL128);
+        auto dec_type = std::reinterpret_pointer_cast<const 
DataTypeDecimal<Decimal128>>(data_type);
+        
ptype->mutable_decimal_type()->set_precision(dec_type->get_precision());
+        ptype->mutable_decimal_type()->set_scale(dec_type->get_scale());
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::String: {
+        ptype->set_id(PGenericType::STRING);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_string_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_string_value(data.to_string());
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_string_value(data.to_string());
+            }
+        }
+        break;
+    }
+    case TypeIndex::Date: {
+        ptype->set_id(PGenericType::DATE);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v = 
VecDateTimeValue(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                }
+            } else {
+                VecDateTimeValue v = 
VecDateTimeValue(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+            }
+        }
+        break;
+    }
+    case TypeIndex::DateTime: {
+        ptype->set_id(PGenericType::DATETIME);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v = 
VecDateTimeValue(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                    date_time->set_hour(v.hour());
+                    date_time->set_minute(v.minute());
+                    date_time->set_second(v.second());
+                }
+            } else {
+                VecDateTimeValue v = 
VecDateTimeValue(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+                date_time->set_hour(v.hour());
+                date_time->set_minute(v.minute());
+                date_time->set_second(v.second());
+            }
+        }
+        break;
+    }
+    case TypeIndex::BitMap: {
+        ptype->set_id(PGenericType::BITMAP);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    default:
+        LOG(INFO) << "unknown type: " << data_type->get_name();
+        ptype->set_id(PGenericType::UNKNOWN);
+        break;
+    }
+}
+
+void convert_nullable_col_to_pvalue(const ColumnPtr& column, const 
DataTypePtr& data_type,
+                                    const ColumnUInt8& null_col, PValues* arg, 
size_t row_count) {
+    if (column->has_null(row_count)) {
+        auto* null_map = arg->mutable_null_map();
+        null_map->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(null_col);
+        auto& data = col->get_data();
+        null_map->Add(data.begin(), data.begin() + row_count);
+        convert_col_to_pvalue<true>(column, data_type, arg, row_count);
+    } else {
+        convert_col_to_pvalue<false>(column, data_type, arg, row_count);
+    }
+}
+
+void convert_block_to_proto(Block& block, const ColumnNumbers& arguments, 
size_t input_rows_count,
+                            PFunctionCallRequest* request) {
+    size_t row_count = std::min(block.rows(), input_rows_count);
+    for (size_t col_idx : arguments) {
+        PValues* arg = request->add_args();
+        ColumnWithTypeAndName& column = block.get_by_position(col_idx);
+        arg->set_has_null(column.column->has_null(row_count));
+        auto col = column.column->convert_to_full_column_if_const();
+        if (auto* nullable = check_and_get_column<const ColumnNullable>(*col)) 
{
+            auto data_col = nullable->get_nested_column_ptr();
+            auto& null_col = nullable->get_null_map_column();
+            auto data_type = std::reinterpret_pointer_cast<const 
DataTypeNullable>(column.type);
+            
convert_nullable_col_to_pvalue(data_col->convert_to_full_column_if_const(),
+                                           data_type->get_nested_type(), 
null_col, arg, row_count);
+        } else {
+            convert_col_to_pvalue<false>(col, column.type, arg, row_count);
+        }
+    }
+}
+
+template <bool nullable>
+void convert_to_column(MutableColumnPtr& column, const PValues& result) {
+    switch (result.type().id()) {
+    case PGenericType::UINT8: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt8*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT16: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT32: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT64: {
+        column->reserve(result.uint64_value_size());
+        column->resize(result.uint64_value_size());
+        auto& data = reinterpret_cast<ColumnUInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.uint64_value_size(); ++i) {
+            data[i] = result.uint64_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT8: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT16: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT32: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT64: {
+        column->reserve(result.int64_value_size());
+        column->resize(result.int64_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.int64_value_size(); ++i) {
+            data[i] = result.int64_value(i);
+        }
+        break;
+    }
+    case PGenericType::DATE:
+    case PGenericType::DATETIME: {
+        column->reserve(result.datetime_value_size());
+        column->resize(result.datetime_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.datetime_value_size(); ++i) {
+            VecDateTimeValue v;
+            PDateTime pv = result.datetime_value(i);
+            v.set_time(pv.year(), pv.month(), pv.day(), pv.hour(), 
pv.minute(), pv.minute());
+            data[i] = binary_cast<VecDateTimeValue, Int64>(v);
+        }
+        break;
+    }
+    case PGenericType::FLOAT: {
+        column->reserve(result.float_value_size());
+        column->resize(result.float_value_size());
+        auto& data = 
reinterpret_cast<ColumnFloat32*>(column.get())->get_data();
+        for (int i = 0; i < result.float_value_size(); ++i) {
+            data[i] = result.float_value(i);
+        }
+        break;
+    }
+    case PGenericType::DOUBLE: {
+        column->reserve(result.double_value_size());
+        column->resize(result.double_value_size());
+        auto& data = 
reinterpret_cast<ColumnFloat64*>(column.get())->get_data();
+        for (int i = 0; i < result.double_value_size(); ++i) {
+            data[i] = result.double_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = reinterpret_cast<ColumnInt128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::STRING: {
+        column->reserve(result.string_value_size());
+        for (int i = 0; i < result.string_value_size(); ++i) {
+            column->insert_data(result.string_value(i).c_str(), 
result.string_value(i).size());
+        }
+        break;
+    }
+    case PGenericType::DECIMAL128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = 
reinterpret_cast<ColumnDecimal128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::BITMAP: {
+        column->reserve(result.bytes_value_size());
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            column->insert_data(result.bytes_value(i).c_str(), 
result.bytes_value(i).size());
+        }
+        break;
+    }
+    default: {
+        LOG(WARNING) << "unknown PGenericType: " << 
result.type().DebugString();
+        break;
+    }
+    }
+}
+
+void convert_to_block(Block& block, const PValues& result, size_t pos) {
+    auto data_type = block.get_data_type(pos);
+    if (data_type->is_nullable()) {
+        auto null_type = std::reinterpret_pointer_cast<const 
DataTypeNullable>(data_type);
+        auto data_col = null_type->get_nested_type()->create_column();
+        convert_to_column<true>(data_col, result);
+        auto null_col = ColumnUInt8::create(data_col->size(), 0);
+        auto& null_map_data = null_col->get_data();
+        null_col->reserve(data_col->size());
+        null_col->resize(data_col->size());
+        if (result.has_null()) {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = result.null_map(i);
+            }
+        } else {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = false;
+            }
+        }
+        block.replace_by_position(
+                pos, std::move(ColumnNullable::create(std::move(data_col), 
std::move(null_col))));
+    } else {
+        auto column = data_type->create_column();
+        convert_to_column<false>(column, result);
+        block.replace_by_position(pos, std::move(column));
+    }
+}
+
+Status RPCFnCall::execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                          size_t result, size_t input_rows_count, bool 
dry_run) {
+    PFunctionCallRequest request;
+    PFunctionCallResponse response;
+    request.set_function_name(_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_symbol.c_str(), _symbol.size(), 21217891, &name_hash);

Review comment:
       Does `murmur_hash3_x64_64` work for ARM64?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to