This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5d02f70a0f [remote-udaf](python-samples) use python to impl remote avg
and sum s… (#11655)
5d02f70a0f is described below
commit 5d02f70a0fb994322cb69df10134706f6b0ff77b
Author: chenlinzhong <[email protected]>
AuthorDate: Wed Aug 10 22:13:37 2022 +0800
[remote-udaf](python-samples) use python to impl remote avg and sum s…
(#11655)
---
.../doris-demo/remote-udaf-python-demo/README.md | 29 ++++
.../function_server_demo.py | 158 +++++++++++++++++++++
.../proto/function_service.proto | 1 +
.../remote-udaf-python-demo/proto/types.proto | 1 +
4 files changed, 189 insertions(+)
diff --git a/samples/doris-demo/remote-udaf-python-demo/README.md
b/samples/doris-demo/remote-udaf-python-demo/README.md
new file mode 100644
index 0000000000..b73e6d6a0d
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/README.md
@@ -0,0 +1,29 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Remote UDF Function Service In Python Demo
+
+## Compile
+1. `pip install grpcio-tools`
+2. `python -m grpc_tools.protoc -Iproto --python_out=. --grpc_python_out=.
proto/function_service.proto && python -m grpc_tools.protoc -Iproto
--python_out=. proto/types.proto`
+
+# Run
+
+`python function_server_demo.py 9000`
+`9000` is the port that the server will listen on
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
new file mode 100644
index 0000000000..5fe5e30242
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from concurrent import futures
+
+import grpc
+
+import function_service_pb2
+import function_service_pb2_grpc
+import types_pb2
+import sys
+import time;
+
+
+class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
+ def fn_call(self, request, context):
+ response = function_service_pb2.PFunctionCallResponse()
+ status = types_pb2.PStatus()
+ status.status_code = 0
+ response.status.CopyFrom(status)
+
+ if request.function_name == "rpc_sum_update":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.INT64
+ result.type.CopyFrom(result_type)
+ total=0
+ size= len(request.args[0].int64_value)
+ for i in range(size):
+ total += request.args[0].int64_value[i]
+
+ if request.HasField("context"):
+ total +=
request.context.function_context.args_data[0].int64_value[0]
+ result.int64_value.append(total)
+ response.result.append(result)
+
+ if request.function_name == "rpc_sum_merge":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.INT64
+ result.type.CopyFrom(result_type)
+ args_len = len(request.args)
+ total = 0
+ for i in range(args_len):
+ total += request.args[i].int64_value[0]
+ result.int64_value.append(total)
+ response.result.append(result)
+
+ if request.function_name == "rpc_sum_finalize":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.INT64
+ result.type.CopyFrom(result_type)
+ total =
request.context.function_context.args_data[0].int64_value[0]
+ result.int64_value.append(total)
+ response.result.append(result)
+
+ if request.function_name == "rpc_avg_update":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.DOUBLE
+ result.type.CopyFrom(result_type)
+ total = 0
+ size = len(request.args[0].int32_value)
+ for i in range(size):
+ total += request.args[0].int32_value[i]
+
+ if request.HasField("context"):
+ total +=
request.context.function_context.args_data[0].double_value[0]
+ size +=
request.context.function_context.args_data[0].int32_value[0]
+
+ result.double_value.append(total)
+ result.int32_value.append(size)
+ response.result.append(result)
+
+ if request.function_name == "rpc_avg_merge":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.DOUBLE
+ result.type.CopyFrom(result_type)
+ total = 0
+ size = 0
+ args_len = len(request.args)
+ for i in range(args_len):
+ total += request.args[i].double_value[0]
+ size += request.args[i].int32_value[0]
+ result.add_double.append(total)
+ result.add_int32.append(size)
+ response.result.append(result)
+
+ if request.function_name == "rpc_avg_finalize":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.DOUBLE
+ result.type.CopyFrom(result_type)
+ total =
request.context.function_context.args_data[0].double_value[0]
+ size =
request.context.function_context.args_data[0].int32_value[0]
+ avg = total / size
+ result.double_value.append(avg)
+ response.result.append(result)
+ return response
+
+ def check_fn(self, request, context):
+ response = function_service_pb2.PCheckFunctionResponse()
+ status = types_pb2.PStatus()
+ status.status_code = 0
+ response.status.CopyFrom(status)
+ return response
+
+ def hand_shake(self, request, context):
+ response = types_pb2.PHandShakeResponse()
+ if request.HasField("hello"):
+ response.hello = request.hello
+ status = types_pb2.Pstatus()
+ status.status_code = 0
+ response.status.CopyFrom(status)
+ return response
+
+
+def serve(port):
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+
function_service_pb2_grpc.add_PFunctionServiceServicer_to_server(FunctionServerDemo(),
server)
+ server.add_insecure_port("0.0.0.0:%s" % port)
+ server.start()
+ while True:
+ time.sleep(1)
+
+
+if __name__ == '__main__':
+ logging.basicConfig()
+ port = 9000
+ if len(sys.argv) > 1:
+ port = sys.argv[1]
+ serve(port)
diff --git
a/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto
b/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto
new file mode 120000
index 0000000000..103b0d49a2
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/proto/function_service.proto
@@ -0,0 +1 @@
+../../../../gensrc/proto/function_service.proto
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/proto/types.proto
b/samples/doris-demo/remote-udaf-python-demo/proto/types.proto
new file mode 120000
index 0000000000..95465b6364
--- /dev/null
+++ b/samples/doris-demo/remote-udaf-python-demo/proto/types.proto
@@ -0,0 +1 @@
+../../../../gensrc/proto/types.proto
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]