This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8a3ee91bb5 [remote-udaf](sample) add some python demo (#11760)
8a3ee91bb5 is described below
commit 8a3ee91bb5b7c207ce17a157d296ce8e33a7ab21
Author: chenlinzhong <[email protected]>
AuthorDate: Tue Aug 16 09:26:36 2022 +0800
[remote-udaf](sample) add some python demo (#11760)
---
.../doris-demo/remote-udaf-python-demo/README.md | 129 ++++++++++++++++++++-
.../function_server_demo.py | 73 +++++++++++-
2 files changed, 198 insertions(+), 4 deletions(-)
diff --git a/samples/doris-demo/remote-udaf-python-demo/README.md
b/samples/doris-demo/remote-udaf-python-demo/README.md
index b73e6d6a0d..8f59b03ba6 100644
--- a/samples/doris-demo/remote-udaf-python-demo/README.md
+++ b/samples/doris-demo/remote-udaf-python-demo/README.md
@@ -26,4 +26,131 @@ under the License.
# Run
`python function_server_demo.py 9000`
-`9000` is the port that the server will listen on
\ No newline at end of file
+`9000` is the port that the server will listen on
+
+# Demo
+
+
+```
+//create one table such as table2
+CREATE TABLE `table2` (
+ `event_day` date NULL,
+ `siteid` int(11) NULL DEFAULT "10",
+ `citycode` smallint(6) NULL,
+ `visitinfo` varchar(1024) NULL DEFAULT "",
+ `pv` varchar(1024) REPLACE NULL DEFAULT "0"
+) ENGINE=OLAP
+AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"in_memory" = "false",
+"storage_format" = "V2"
+)
+//import some data
+MySQL [test_db]> select * from table2;
++------------+--------+----------+------------------------------------+------+
+| event_day | siteid | citycode | visitinfo | pv |
++------------+--------+----------+------------------------------------+------+
+| 2017-07-03 | 8 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
+| 2017-07-03 | 37 | 12 | {"ip":"192.168.0.3","source":"pc"} | 81 |
+| 2017-07-03 | 67 | 16 | {"ip":"192.168.0.2","source":"pc"} | 79 |
+| 2017-07-03 | 101 | 11 | {"ip":"192.168.0.5","source":"pc"} | 65 |
+| 2017-07-03 | 32 | 15 | {"ip":"192.168.0.1","source":"pc"} | 188 |
+| 2017-07-03 | 103 | 12 | {"ip":"192.168.0.5","source":"pc"} | 123 |
+| 2017-07-03 | 104 | 16 | {"ip":"192.168.0.5","source":"pc"} | 79 |
+| 2017-07-03 | 3 | 12 | {"ip":"192.168.0.3","source":"pc"} | 123 |
+| 2017-07-03 | 3 | 15 | {"ip":"192.168.0.2","source":"pc"} | 188 |
+| 2017-07-03 | 13 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
+| 2017-07-03 | 53 | 12 | {"ip":"192.168.0.2","source":"pc"} | 123 |
+| 2017-07-03 | 1 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
+| 2017-07-03 | 7 | 16 | {"ip":"192.168.0.4","source":"pc"} | 79 |
+| 2017-07-03 | 102 | 15 | {"ip":"192.168.0.5","source":"pc"} | 188 |
+| 2017-07-03 | 105 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
++------------+--------+----------+------------------------------------+------+
+```
+
+### 1. find most visit top 3 ip
+```
+MySQL [test_db]> CREATE AGGREGATE FUNCTION
rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES (
+ "TYPE"="RPC",
+ "OBJECT_FILE"="127.0.0.1:9000",
+ "update_fn"="rpc_count_visit_info_update",
+ "merge_fn"="rpc_count_visit_info_merge",
+ "finalize_fn"="rpc_count_visit_info_finalize"
+);
+MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2;
++--------------------------------------------+
+| rpc_count_visit_info(`visitinfo`) |
++--------------------------------------------+
+| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3 |
++--------------------------------------------+
+1 row in set (0.036 sec)
+MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2
group by citycode;
++----------+--------------------------------------------+
+| citycode | rpc_count_visit_info(`visitinfo`) |
++----------+--------------------------------------------+
+| 15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1 |
+| 11 | 192.168.0.1:2 192.168.0.5:1 |
+| 12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1 |
+| 16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1 |
++----------+--------------------------------------------+
+4 rows in set (0.050 sec)
+```
+### 2. sum pv
+```
+CREATE AGGREGATE FUNCTION rpc_sum(bigint) RETURNS bigint PROPERTIES (
+ "TYPE"="RPC",
+ "OBJECT_FILE"="127.0.0.1:9700",
+ "update_fn"="rpc_sum_update",
+ "merge_fn"="rpc_sum_merge",
+ "finalize_fn"="rpc_sum_finalize"
+);
+MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode;
++----------+---------------+
+| citycode | rpc_sum(`pv`) |
++----------+---------------+
+| 15 | 564 |
+| 11 | 195 |
+| 12 | 612 |
+| 16 | 237 |
++----------+---------------+
+4 rows in set (0.067 sec)
+MySQL [test_db]> select rpc_sum(pv) from table2;
++---------------+
+| rpc_sum(`pv`) |
++---------------+
+| 1608 |
++---------------+
+1 row in set (0.030 sec)
+```
+
+### 3. avg pv
+
+```
+CREATE AGGREGATE FUNCTION rpc_avg(int) RETURNS double PROPERTIES (
+ "TYPE"="RPC",
+ "OBJECT_FILE"="127.0.0.1:9000",
+ "update_fn"="rpc_avg_update",
+ "merge_fn"="rpc_avg_merge",
+ "finalize_fn"="rpc_avg_finalize"
+);
+MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode;
++----------+---------------+
+| citycode | rpc_avg(`pv`) |
++----------+---------------+
+| 15 | 188 |
+| 11 | 65 |
+| 12 | 102 |
+| 16 | 79 |
++----------+---------------+
+4 rows in set (0.039 sec)
+MySQL [test_db]> select rpc_avg(pv) from table2;
++---------------+
+| rpc_avg(`pv`) |
++---------------+
+| 107.2 |
++---------------+
+1 row in set (0.028 sec)
+```
\ No newline at end of file
diff --git a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
index 5fe5e30242..eb7a3a59ca 100644
--- a/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
+++ b/samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
@@ -28,6 +28,8 @@ import function_service_pb2_grpc
import types_pb2
import sys
import time;
+import json
+
class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
@@ -36,7 +38,6 @@ class
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
status = types_pb2.PStatus()
status.status_code = 0
response.status.CopyFrom(status)
-
if request.function_name == "rpc_sum_update":
result = types_pb2.PValues()
result.has_null = False
@@ -107,8 +108,8 @@ class
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
for i in range(args_len):
total += request.args[i].double_value[0]
size += request.args[i].int32_value[0]
- result.add_double.append(total)
- result.add_int32.append(size)
+ result.double_value.append(total)
+ result.int32_value.append(size)
response.result.append(result)
if request.function_name == "rpc_avg_finalize":
@@ -122,6 +123,72 @@ class
FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
avg = total / size
result.double_value.append(avg)
response.result.append(result)
+ if request.function_name == "rpc_count_visit_info_update":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.STRING
+ result.type.CopyFrom(result_type)
+ size = len(request.args[0].string_value)
+ currentMap=dict()
+ if request.HasField("context"):
+ context =
request.context.function_context.args_data[0].string_value[0]
+ currentMap = json.loads(context)
+ for i in range(size):
+ s = request.args[0].string_value[i]
+ mapInfo = json.loads(s)
+ ip=mapInfo['ip']
+ if currentMap.has_key(ip):
+ last_val=currentMap[ip]
+ last_val+=1
+ currentMap[ip] = last_val
+ else:
+ currentMap[ip] = 1
+ json_dict = json.dumps(currentMap)
+ result.string_value.append(json_dict)
+ response.result.append(result)
+
+ if request.function_name == "rpc_count_visit_info_merge":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.STRING
+ result.type.CopyFrom(result_type)
+
+ context1 = request.args[0].string_value[0]
+ currentMap1 = json.loads(context1)
+ context2 = request.args[1].string_value[0]
+ currentMap2 = json.loads(context2)
+ for ip,num in currentMap2.items():
+ if currentMap1.has_key(ip):
+ currentMap1[ip] = currentMap1[ip] + num
+ else:
+ currentMap1[ip] = num
+ json_dict = json.dumps(currentMap1)
+ result.string_value.append(json_dict)
+ response.result.append(result)
+
+ if request.function_name == "rpc_count_visit_info_finalize":
+ result = types_pb2.PValues()
+ result.has_null = False
+ result_type = types_pb2.PGenericType()
+ result_type.id = types_pb2.PGenericType.STRING
+ result.type.CopyFrom(result_type)
+
+ context =
request.context.function_context.args_data[0].string_value[0]
+ currentMap = json.loads(context)
+ sortedMap=sorted(currentMap.items(), key = lambda kv:(kv[1],
kv[0]),reverse=True)
+ resultMap=dict()
+ topN=3
+ if len(sortedMap) < topN:
+ topN = len(sortedMap)
+ finalResult=""
+ print(sortedMap)
+ for i in range(topN):
+ ip=sortedMap[i][0]
+ finalResult +=ip +":"+str(sortedMap[i][1]) +" "
+ result.string_value.append(finalResult)
+ response.result.append(result)
return response
def check_fn(self, request, context):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]