This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6506f9bfd48 [improve][fn] support reading config options from file in
Function Python Runner (#18951)
6506f9bfd48 is described below
commit 6506f9bfd4896de489d29fa7ec46bd11d9e6d94e
Author: laminar <[email protected]>
AuthorDate: Wed Feb 8 09:17:01 2023 +0800
[improve][fn] support reading config options from file in Function Python
Runner (#18951)
Signed-off-by: laminar <[email protected]>
---
.../instance/src/main/python/python_instance.py | 4 +-
.../src/main/python/python_instance_main.py | 107 +++++++++++++++++----
pulsar-functions/instance/src/main/python/util.py | 19 ++++
.../src/test/python/test_python_instance_main.py | 69 +++++++++++++
.../src/test/python/test_python_runtime_config.ini | 27 ++++++
5 files changed, 205 insertions(+), 21 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index f4ad66527e6..cf53e75d9d0 100755
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -80,7 +80,8 @@ class PythonInstance(object):
pulsar_client,
secrets_provider,
cluster_name,
- state_storage_serviceurl):
+ state_storage_serviceurl,
+ config_file):
self.instance_config = InstanceConfig(instance_id, function_id,
function_version, function_details, max_buffered_tuples)
self.user_code = user_code
# set queue size to one since consumers already have internal queues. Just
use queue to communicate message from
@@ -114,6 +115,7 @@ class PythonInstance(object):
instance_id, cluster_name,
"%s/%s/%s" % (function_details.tenant,
function_details.namespace, function_details.name)]
self.stats = Stats(self.metrics_labels)
+ self.config_file = config_file
def health_check(self):
self.last_health_check_ts = time.time()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 3967635365c..2d6520b2e99 100755
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -49,47 +49,113 @@ from bookkeeper.kv.client import Client
to_run = True
Log = log.Log
+
def atexit_function(signo, _frame):
global to_run
Log.info("Interrupted by %d, shutting down" % signo)
to_run = False
-def main():
- # Setup signal handlers
- signal.signal(signal.SIGTERM, atexit_function)
- signal.signal(signal.SIGHUP, atexit_function)
- signal.signal(signal.SIGINT, atexit_function)
+def generate_arguments_parser():
parser = argparse.ArgumentParser(description='Pulsar Functions Python
Instance')
- parser.add_argument('--function_details', required=True, help='Function
Details Json String')
- parser.add_argument('--py', required=True, help='Full Path of Function Code
File')
- parser.add_argument('--instance_id', required=True, help='Instance Id')
- parser.add_argument('--function_id', required=True, help='Function Id')
- parser.add_argument('--function_version', required=True, help='Function
Version')
- parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar
Service Url')
+ parser.add_argument('--function_details', required=False, help='Function
Details Json String')
+ parser.add_argument('--py', required=False, help='Full Path of Function Code
File')
+ parser.add_argument('--instance_id', required=False, help='Instance Id')
+ parser.add_argument('--function_id', required=False, help='Function Id')
+ parser.add_argument('--function_version', required=False, help='Function
Version')
+ parser.add_argument('--pulsar_serviceurl', required=False, help='Pulsar
Service Url')
parser.add_argument('--client_auth_plugin', required=False, help='Client
authentication plugin')
parser.add_argument('--client_auth_params', required=False, help='Client
authentication params')
parser.add_argument('--use_tls', required=False, help='Use tls')
parser.add_argument('--tls_allow_insecure_connection', required=False,
help='Tls allow insecure connection')
parser.add_argument('--hostname_verification_enabled', required=False,
help='Enable hostname verification')
parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust
cert file path')
- parser.add_argument('--port', required=True, help='Instance Port', type=int)
- parser.add_argument('--metrics_port', required=True, help="Port metrics will
be exposed on", type=int)
- parser.add_argument('--max_buffered_tuples', required=True, help='Maximum
number of Buffered tuples')
- parser.add_argument('--logging_directory', required=True, help='Logging
Directory')
- parser.add_argument('--logging_file', required=True, help='Log file name')
+ parser.add_argument('--port', required=False, help='Instance Port', type=int)
+ parser.add_argument('--metrics_port', required=False, help="Port metrics
will be exposed on", type=int)
+ parser.add_argument('--max_buffered_tuples', required=False, help='Maximum
number of Buffered tuples')
+ parser.add_argument('--logging_directory', required=False, help='Logging
Directory')
+ parser.add_argument('--logging_file', required=False, help='Log file name')
parser.add_argument('--logging_level', required=False, help='Logging level')
- parser.add_argument('--logging_config_file', required=True, help='Config
file for logging')
- parser.add_argument('--expected_healthcheck_interval', required=True,
help='Expected time in seconds between health checks', type=int)
+ parser.add_argument('--logging_config_file', required=False, help='Config
file for logging')
+ parser.add_argument('--expected_healthcheck_interval', required=False,
help='Expected time in seconds between health checks', type=int)
parser.add_argument('--secrets_provider', required=False, help='The
classname of the secrets provider')
parser.add_argument('--secrets_provider_config', required=False, help='The
config that needs to be passed to secrets provider')
parser.add_argument('--install_usercode_dependencies', required=False,
help='For packaged python like wheel files, do we need to install all
dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For
packaged python like wheel files, which repository to pull the dependencies
from')
parser.add_argument('--extra_dependency_repository', required=False,
help='For packaged python like wheel files, any extra repository to pull the
dependencies from')
parser.add_argument('--state_storage_serviceurl', required=False,
help='Managed State Storage Service Url')
- parser.add_argument('--cluster_name', required=True, help='The name of the
cluster this instance is running on')
+ parser.add_argument('--cluster_name', required=False, help='The name of the
cluster this instance is running on')
+ parser.add_argument('--config_file', required=False, default="",
help='Configuration file name', type=str)
+ return parser
+
+def merge_arguments(args, config_file):
+ """
+ This function is used to merge arguments passed in via the command line
+ and those passed in via the configuration file during initialization.
+
+ :param args: arguments passed in via the command line
+ :param config_file: configuration file name (path)
+
+ During the merge process, the arguments passed in via the command line have
higher priority,
+ so only optional arguments need to be merged.
+ """
+ if config_file is None:
+ return
+ config = util.read_config(config_file)
+ if not config:
+ return
+ default_config = config["DEFAULT"]
+ if not default_config:
+ return
+ for k, v in vars(args).items():
+ if k == "config_file":
+ continue
+ if not v and default_config.get(k, None):
+ vars(args)[k] = default_config.get(k)
+
+
+def validate_arguments(args):
+ """
+ This function is used to verify the merged arguments,
+ mainly to check whether the mandatory arguments are assigned properly.
+
+ :param args: arguments after merging
+ """
+ mandatory_args_map = {
+ "function_details": args.function_details,
+ "py": args.py,
+ "instance_id": args.instance_id,
+ "function_id": args.function_id,
+ "function_version": args.function_version,
+ "pulsar_serviceurl": args.pulsar_serviceurl,
+ "port": args.port,
+ "metrics_port": args.metrics_port,
+ "max_buffered_tuples": args.max_buffered_tuples,
+ "logging_directory": args.logging_directory,
+ "logging_file": args.logging_file,
+ "logging_config_file": args.logging_config_file,
+ "expected_healthcheck_interval": args.expected_healthcheck_interval,
+ "cluster_name": args.cluster_name
+ }
+ missing_args = []
+ for k, v in mandatory_args_map.items():
+ if v is None:
+ missing_args.append(k)
+ if missing_args:
+ print("The following arguments are required:", missing_args)
+ sys.exit(1)
+
+
+def main():
+ # Setup signal handlers
+ signal.signal(signal.SIGTERM, atexit_function)
+ signal.signal(signal.SIGHUP, atexit_function)
+ signal.signal(signal.SIGINT, atexit_function)
+ parser = generate_arguments_parser()
args = parser.parse_args()
+ merge_arguments(args, args.config_file)
+ validate_arguments(args)
function_details = Function_pb2.FunctionDetails()
args.function_details = str(args.function_details)
if args.function_details[0] == '\'':
@@ -216,7 +282,8 @@ def main():
pulsar_client,
secrets_provider,
args.cluster_name,
- state_storage_serviceurl)
+ state_storage_serviceurl,
+ args.config_file)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
diff --git a/pulsar-functions/instance/src/main/python/util.py
b/pulsar-functions/instance/src/main/python/util.py
index 48ba2f0e6d7..f5868093fae 100755
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -25,6 +25,8 @@ import os
import inspect
import sys
import importlib
+import configparser
+
from threading import Timer
from pulsar.functions import serde
@@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name,
instance_id):
def get_properties(fullyQualifiedName, instanceId):
return {"application": "pulsar-function", "id": str(fullyQualifiedName),
"instance_id": str(instanceId)}
+def read_config(config_file):
+ """
+ The content of the configuration file is styled as follows:
+
+ [DEFAULT]
+ parameter1 = value1
+ parameter2 = value2
+ parameter3 = value3
+ ...
+ """
+ if config_file == "":
+ return None
+
+ cfg = configparser.ConfigParser()
+ cfg.read(config_file)
+ return cfg
+
class FixedTimer():
def __init__(self, t, hFunction, name="timer-thread"):
diff --git
a/pulsar-functions/instance/src/test/python/test_python_instance_main.py
b/pulsar-functions/instance/src/test/python/test_python_instance_main.py
new file mode 100644
index 00000000000..af248691919
--- /dev/null
+++ b/pulsar-functions/instance/src/test/python/test_python_instance_main.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# DEPENDENCIES: unittest2
+import python_instance_main
+
+import os
+import log
+import unittest
+
+class TestContextImpl(unittest.TestCase):
+
+ def Any(cls):
+ class Any(cls):
+ def __eq__(self, other):
+ return True
+ return Any()
+
+ def setUp(self):
+ log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") +
"/conf/functions-logging/console_logging_config.ini")
+
+ def test_arguments(self):
+ parser = python_instance_main.generate_arguments_parser()
+ argv = [
+ "--function_details", "test_function_details",
+ "--py", "test_py",
+ "--instance_id", "test_instance_id",
+ "--function_id", "test_function_id",
+ "--function_version", "test_function_version",
+ "--pulsar_serviceurl", "test_pulsar_serviceurl",
+ "--client_auth_plugin", "test_client_auth_plugin",
+ "--client_auth_params", "test_client_auth_params",
+ "--tls_allow_insecure_connection", "true",
+ "--hostname_verification_enabled", "true",
+ "--tls_trust_cert_path", "test_tls_trust_cert_path",
+ "--port", "1000",
+ "--metrics_port", "1001",
+ "--max_buffered_tuples", "100",
+ "--config_file", "test_python_runtime_config.ini"
+ ]
+ args = parser.parse_args(argv)
+ python_instance_main.merge_arguments(args, args.config_file)
+ # argument from command line test
+ self.assertEqual(args.function_details, "test_function_details")
+ # argument from config file test
+ self.assertEqual(args.use_tls, "true")
+ # argument read priority test
+ self.assertEqual(args.port, 1000)
+ # mandatory argument test
+ self.assertEqual(args.expected_healthcheck_interval, "50")
+ # optional argument test
+ self.assertEqual(args.secrets_provider, None)
diff --git
a/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini
b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini
new file mode 100644
index 00000000000..8e172647178
--- /dev/null
+++ b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[DEFAULT]
+port=5000
+metrics_port=5001
+use_tls=true
+logging_directory=test_logging_directory
+logging_file=test_logging_file
+logging_config_file=test_logging_config_file
+expected_healthcheck_interval=50
\ No newline at end of file