This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b0a4018 Simplify argument passing in process runtimes (#2383)
b0a4018 is described below
commit b0a4018e09f5476934f9bf8b674178b1f6bbd7f7
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Aug 16 11:04:03 2018 -0700
Simplify argument passing in process runtimes (#2383)
### Motivation
Currently the process runtime goes over each variable inside the
functiondetails protobuf and passes them as individual arguments. This is not
only complicated, but each new variable addition/deletion results in changes to
processruntime. This pr passes the entire proto as a json string which can then
be converted back to protobuf in the instances, thereby simplifying this
process.
---
.../instance/src/main/python/Function_pb2.py | 79 ++++++++----
.../src/main/python/InstanceCommunication_pb2.py | 137 ++++++++++++++++++---
.../instance/src/main/python/python_instance.py | 6 +-
.../src/main/python/python_instance_main.py | 63 ++--------
.../pulsar/functions/runtime/JavaInstanceMain.java | 109 +---------------
.../pulsar/functions/runtime/ProcessRuntime.java | 93 +-------------
.../functions/runtime/ProcessRuntimeFactory.java | 2 +-
.../pulsar/functions/runtime/RuntimeFactory.java | 2 +-
.../functions/runtime/ProcessRuntimeTest.java | 47 ++-----
9 files changed, 210 insertions(+), 328 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index c9513c5..61c87f4 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='Function.proto',
package='proto',
syntax='proto3',
-
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
+
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"\xa9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1244,
- serialized_end=1323,
+ serialized_start=1347,
+ serialized_end=1426,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1325,
- serialized_end=1369,
+ serialized_start=1428,
+ serialized_end=1472,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -116,8 +116,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=453,
- serialized_end=484,
+ serialized_start=473,
+ serialized_end=504,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
@@ -265,6 +265,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='packageUrl', full_name='proto.FunctionDetails.packageUrl',
index=13,
+ number=14, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -279,7 +286,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=79,
- serialized_end=484,
+ serialized_end=504,
)
@@ -316,8 +323,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=709,
- serialized_end=770,
+ serialized_start=795,
+ serialized_end=856,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -369,6 +376,27 @@ _SOURCESPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='topicsPattern', full_name='proto.SourceSpec.topicsPattern',
index=6,
+ number=7, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='builtin', full_name='proto.SourceSpec.builtin', index=7,
+ number=8, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='subscriptionName', full_name='proto.SourceSpec.subscriptionName',
index=8,
+ number=9, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -381,8 +409,8 @@ _SOURCESPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=487,
- serialized_end=770,
+ serialized_start=507,
+ serialized_end=856,
)
@@ -428,6 +456,13 @@ _SINKSPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='builtin', full_name='proto.SinkSpec.builtin', index=5,
+ number=6, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -440,8 +475,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=772,
- serialized_end=880,
+ serialized_start=858,
+ serialized_end=983,
)
@@ -471,8 +506,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=882,
- serialized_end=928,
+ serialized_start=985,
+ serialized_end=1031,
)
@@ -523,8 +558,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=931,
- serialized_end=1092,
+ serialized_start=1034,
+ serialized_end=1195,
)
@@ -561,8 +596,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1094,
- serialized_end=1175,
+ serialized_start=1197,
+ serialized_end=1278,
)
@@ -599,8 +634,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1177,
- serialized_end=1242,
+ serialized_start=1280,
+ serialized_end=1345,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type =
_PROCESSINGGUARANTEES
diff --git
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index a16399e..f33f4f4 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
-
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd8\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=643,
- serialized_end=712,
+ serialized_start=661,
+ serialized_end=730,
)
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=714,
- serialized_end=778,
+ serialized_start=732,
+ serialized_end=796,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -232,6 +232,13 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='workerId', full_name='proto.FunctionStatus.workerId', index=15,
+ number=16, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -245,7 +252,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=68,
- serialized_end=778,
+ serialized_end=796,
)
@@ -275,8 +282,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=780,
- serialized_end=851,
+ serialized_start=798,
+ serialized_end=869,
)
@@ -327,8 +334,8 @@ _METRICSDATA_DATADIGEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=919,
- serialized_end=985,
+ serialized_start=937,
+ serialized_end=1003,
)
_METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
@@ -364,8 +371,8 @@ _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=987,
- serialized_end=1064,
+ serialized_start=1005,
+ serialized_end=1082,
)
_METRICSDATA = _descriptor.Descriptor(
@@ -394,8 +401,8 @@ _METRICSDATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=854,
- serialized_end=1064,
+ serialized_start=872,
+ serialized_end=1082,
)
@@ -425,8 +432,83 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1066,
- serialized_end=1102,
+ serialized_start=1084,
+ serialized_end=1120,
+)
+
+
+_METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
+ name='InstanceMetrics',
+ full_name='proto.Metrics.InstanceMetrics',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='proto.Metrics.InstanceMetrics.name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='instanceId', full_name='proto.Metrics.InstanceMetrics.instanceId',
index=1,
+ number=2, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='metricsData',
full_name='proto.Metrics.InstanceMetrics.metricsData', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1183,
+ serialized_end=1275,
+)
+
+_METRICS = _descriptor.Descriptor(
+ name='Metrics',
+ full_name='proto.Metrics',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='metrics', full_name='proto.Metrics.metrics', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_METRICS_INSTANCEMETRICS, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1123,
+ serialized_end=1275,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -440,10 +522,14 @@ _METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type =
_METRICSDATA_DATADIGEST
_METRICSDATA_METRICSENTRY.containing_type = _METRICSDATA
_METRICSDATA.fields_by_name['metrics'].message_type = _METRICSDATA_METRICSENTRY
+_METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type =
_METRICSDATA
+_METRICS_INSTANCEMETRICS.containing_type = _METRICS
+_METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
+DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus',
(_message.Message,), dict(
@@ -506,6 +592,21 @@ HealthCheckResult =
_reflection.GeneratedProtocolMessageType('HealthCheckResult'
))
_sym_db.RegisterMessage(HealthCheckResult)
+Metrics = _reflection.GeneratedProtocolMessageType('Metrics',
(_message.Message,), dict(
+
+ InstanceMetrics =
_reflection.GeneratedProtocolMessageType('InstanceMetrics',
(_message.Message,), dict(
+ DESCRIPTOR = _METRICS_INSTANCEMETRICS,
+ __module__ = 'InstanceCommunication_pb2'
+ # @@protoc_insertion_point(class_scope:proto.Metrics.InstanceMetrics)
+ ))
+ ,
+ DESCRIPTOR = _METRICS,
+ __module__ = 'InstanceCommunication_pb2'
+ # @@protoc_insertion_point(class_scope:proto.Metrics)
+ ))
+_sym_db.RegisterMessage(Metrics)
+_sym_db.RegisterMessage(Metrics.InstanceMetrics)
+
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(),
_b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
@@ -520,8 +621,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
- serialized_start=1105,
- serialized_end=1453,
+ serialized_start=1278,
+ serialized_end=1626,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index bb16253..2fc5091 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,13 +114,13 @@ class Stats(object):
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, user_code, log_topic, pulsar_client):
+ def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, user_code, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id,
function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = Queue.Queue(max_buffered_tuples)
self.log_topic_handler = None
- if log_topic is not None:
- self.log_topic_handler = log.LogTopicHandler(str(log_topic),
pulsar_client)
+ if function_details.logTopic is not None and function_details.logTopic !=
"":
+ self.log_topic_handler =
log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
self.pulsar_client = pulsar_client
self.input_serdes = {}
self.consumers = {}
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index a3c1179..514d7fe 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -28,7 +28,6 @@ import os
import sys
import signal
import time
-import json
from pulsar import Authentication
import pulsar
@@ -38,6 +37,7 @@ import log
import server
import python_instance
import util
+from google.protobuf import json_format
to_run = True
Log = log.Log
@@ -54,15 +54,11 @@ def main():
signal.signal(signal.SIGINT, atexit_function)
parser = argparse.ArgumentParser(description='Pulsar Functions Python
Instance')
- parser.add_argument('--function_classname', required=True, help='Function
Class Name')
+ parser.add_argument('--function_details', required=True, help='Function
Details Json String')
parser.add_argument('--py', required=True, help='Full Path of Function Code
File')
- parser.add_argument('--name', required=True, help='Function Name')
- parser.add_argument('--tenant', required=True, help='Tenant Name')
- parser.add_argument('--namespace', required=True, help='Namespace name')
parser.add_argument('--instance_id', required=True, help='Instance Id')
parser.add_argument('--function_id', required=True, help='Function Id')
parser.add_argument('--function_version', required=True, help='Function
Version')
- parser.add_argument('--processing_guarantees', required=True,
help='Processing Guarantees')
parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar
Service Url')
parser.add_argument('--client_auth_plugin', required=False, help='Client
authentication plugin')
parser.add_argument('--client_auth_params', required=False, help='Client
authentication params')
@@ -72,64 +68,22 @@ def main():
parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust
cert file path')
parser.add_argument('--port', required=True, help='Instance Port', type=int)
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum
number of Buffered tuples')
- parser.add_argument('--user_config', required=False, help='User Config')
parser.add_argument('--logging_directory', required=True, help='Logging
Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
- parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
- parser.add_argument('--log_topic', required=False, help='Topic to send Log
Messages')
- parser.add_argument('--source_subscription_type', required=True,
help='Subscription Type')
- parser.add_argument('--source_topics_serde_classname', required=True,
help='A mapping of Input topics to SerDe')
- parser.add_argument('--topics_pattern', required=False, help='TopicsPattern
to consume from list of topics under a namespace that match the pattern (not
supported)')
- parser.add_argument('--source_timeout_ms', required=False, help='Source
message timeout in milliseconds')
- parser.add_argument('--sink_topic', required=False, help='Sink Topic')
- parser.add_argument('--sink_serde_classname', required=False, help='Sink
SerDe classname')
args = parser.parse_args()
+ function_details = Function_pb2.FunctionDetails()
+ json_format.Parse(args.function_details, function_details)
log_file = os.path.join(args.logging_directory,
- util.getFullyQualifiedFunctionName(args.tenant,
args.namespace, args.name),
+
util.getFullyQualifiedFunctionName(function_details.tenant,
function_details.namespace, function_details.name),
"%s-%s.log" % (args.logging_file, args.instance_id))
log.init_rotating_logger(level=logging.INFO, logfile=log_file,
max_files=5, max_bytes=10 * 1024 * 1024)
Log.info("Starting Python instance with %s" % str(args))
- function_details = Function_pb2.FunctionDetails()
- function_details.tenant = args.tenant
- function_details.namespace = args.namespace
- function_details.name = args.name
- function_details.className = args.function_classname
-
- if args.topics_pattern:
- raise ValueError('topics_pattern is not supported by python client')
- sourceSpec = Function_pb2.SourceSpec()
- sourceSpec.subscriptionType =
Function_pb2.SubscriptionType.Value(args.source_subscription_type)
- try:
- source_topics_serde_classname_dict =
json.loads(args.source_topics_serde_classname)
- except ValueError:
- Log.critical("Cannot decode source_topics_serde_classname. This argument
must be specifed as a JSON")
- sys.exit(1)
- if not source_topics_serde_classname_dict:
- Log.critical("source_topics_serde_classname cannot be empty")
- for topics, serde_classname in source_topics_serde_classname_dict.items():
- sourceSpec.topicsToSerDeClassName[topics] = serde_classname
- if args.source_timeout_ms:
- sourceSpec.timeoutMs = long(args.source_timeout_ms)
- function_details.source.MergeFrom(sourceSpec)
-
- sinkSpec = Function_pb2.SinkSpec()
- if args.sink_topic != None and len(args.sink_topic) != 0:
- sinkSpec.topic = args.sink_topic
- if args.sink_serde_classname != None and len(args.sink_serde_classname) != 0:
- sinkSpec.serDeClassName = args.sink_serde_classname
- function_details.sink.MergeFrom(sinkSpec)
-
- function_details.processingGuarantees =
Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
- if args.auto_ack == "true":
- function_details.autoAck = True
- else:
- function_details.autoAck = False
- if args.user_config != None and len(args.user_config) != 0:
- function_details.userConfig = args.user_config
+ if function_details.source.topicsPattern:
+ raise ValueError('topicsPattern is not supported by python client')
authentication = None
use_tls = False
@@ -146,8 +100,7 @@ def main():
pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1,
1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
pyinstance = python_instance.PythonInstance(str(args.instance_id),
str(args.function_id),
str(args.function_version),
function_details,
- int(args.max_buffered_tuples),
str(args.py),
- args.log_topic, pulsar_client)
+ int(args.max_buffered_tuples),
str(args.py), pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 966740d..8924d94 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -53,24 +54,13 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
public class JavaInstanceMain implements AutoCloseable {
- @Parameter(names = "--function_classname", description = "Function Class
Name\n", required = true)
- protected String className;
+ @Parameter(names = "--function_details", description = "Function details
json\n", required = true)
+ protected String functionDetailsJsonString;
@Parameter(
names = "--jar",
description = "Path to Jar\n",
listConverter = StringConverter.class)
protected String jarFile;
- @Parameter(names = "--name", description = "Function Name\n", required =
true)
- protected String functionName;
- @Parameter(names = "--tenant", description = "Tenant Name\n", required =
true)
- protected String tenant;
- @Parameter(names = "--namespace", description = "Namespace Name\n",
required = true)
- protected String namespace;
- @Parameter(names = "--log_topic", description = "Log Topic")
- protected String logTopic;
-
- @Parameter(names = "--processing_guarantees", description = "Processing
Guarantees\n", required = true)
- protected ProcessingGuarantees processingGuarantees;
@Parameter(names = "--instance_id", description = "Instance Id\n",
required = true)
protected String instanceId;
@@ -111,48 +101,6 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--max_buffered_tuples", description = "Maximum number
of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
- @Parameter(names = "--user_config", description = "UserConfig\n")
- protected String userConfig;
-
- @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
- protected String autoAck = Boolean.TRUE.toString();
-
- @Parameter(names = "--source_classname", description = "The source
classname")
- protected String sourceClassname;
-
- @Parameter(names = "--source_configs", description = "The source configs")
- protected String sourceConfigs;
-
- @Parameter(names = "--source_type_classname", description = "The return
type of the source", required = true)
- protected String sourceTypeClassName;
-
- @Parameter(names = "--source_subscription_type", description = "The source
subscription type", required = true)
- protected String sourceSubscriptionType;
-
- @Parameter(names = "--source_topics_serde_classname", description = "A map
of topics to SerDe for the source")
- protected String sourceTopicsSerdeClassName;
-
- @Parameter(names = "--topics_pattern", description = "TopicsPattern to
consume from list of topics under a namespace that match the pattern. [--input]
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a
pattern in --customSerdeInputs")
- protected String topicsPattern;
-
- @Parameter(names = "--source_timeout_ms", description = "Source message
timeout in milliseconds")
- protected Long sourceTimeoutMs;
-
- @Parameter(names = "--sink_type_classname", description = "The injest type
of the sink", required = true)
- protected String sinkTypeClassName;
-
- @Parameter(names = "--sink_configs", description = "The sink configs\n")
- protected String sinkConfigs;
-
- @Parameter(names = "--sink_classname", description = "The sink
classname\n")
- protected String sinkClassname;
-
- @Parameter(names = "--sink_topic", description = "The sink Topic Name\n")
- protected String sinkTopic;
-
- @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
- protected String sinkSerdeClassName;
-
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -169,56 +117,7 @@ public class JavaInstanceMain implements AutoCloseable {
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
- functionDetailsBuilder.setTenant(tenant);
- functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(functionName);
- functionDetailsBuilder.setClassName(className);
-
- if (logTopic != null) {
- functionDetailsBuilder.setLogTopic(logTopic);
- }
- functionDetailsBuilder.setProcessingGuarantees(processingGuarantees);
- functionDetailsBuilder.setAutoAck(isTrue(autoAck));
- if (userConfig != null && !userConfig.isEmpty()) {
- functionDetailsBuilder.setUserConfig(userConfig);
- }
-
- // Setup source
- SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
- if (sourceClassname != null) {
- sourceDetailsBuilder.setClassName(sourceClassname);
- }
- if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
- sourceDetailsBuilder.setConfigs(sourceConfigs);
- }
-
sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType));
- sourceDetailsBuilder.putAllTopicsToSerDeClassName(new
Gson().fromJson(sourceTopicsSerdeClassName, Map.class));
- if (isNotBlank(topicsPattern)) {
- sourceDetailsBuilder.setTopicsPattern(topicsPattern);
- }
- sourceDetailsBuilder.setTypeClassName(sourceTypeClassName);
- if (sourceTimeoutMs != null) {
- sourceDetailsBuilder.setTimeoutMs(sourceTimeoutMs);
- }
- functionDetailsBuilder.setSource(sourceDetailsBuilder);
-
- // Setup sink
- SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
- if (sinkClassname != null) {
- sinkSpecBuilder.setClassName(sinkClassname);
- }
- if (sinkConfigs != null) {
- sinkSpecBuilder.setConfigs(sinkConfigs);
- }
- if (sinkSerdeClassName != null) {
- sinkSpecBuilder.setSerDeClassName(sinkSerdeClassName);
- }
- sinkSpecBuilder.setTypeClassName(sinkTypeClassName);
- if (sinkTopic != null && !sinkTopic.isEmpty()) {
- sinkSpecBuilder.setTopic(sinkTopic);
- }
- functionDetailsBuilder.setSink(sinkSpecBuilder);
-
+ JsonFormat.parser().merge(functionDetailsJsonString,
functionDetailsBuilder);
FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index f3ed170..3e34661 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.Getter;
@@ -72,7 +73,7 @@ class ProcessRuntime implements Runtime {
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) {
+ AuthenticationConfig authConfig) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.processArgs = composeArgs(instanceConfig, instanceFile,
logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
@@ -85,7 +86,7 @@ class ProcessRuntime implements Runtime {
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) {
+ AuthenticationConfig authConfig) throws
Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -124,28 +125,9 @@ class ProcessRuntime implements Runtime {
args.add(instanceConfig.getFunctionId());
args.add("--function_version");
args.add(instanceConfig.getFunctionVersion());
- args.add("--tenant");
- args.add(instanceConfig.getFunctionDetails().getTenant());
- args.add("--namespace");
- args.add(instanceConfig.getFunctionDetails().getNamespace());
- args.add("--name");
- args.add(instanceConfig.getFunctionDetails().getName());
- args.add("--function_classname");
- args.add(instanceConfig.getFunctionDetails().getClassName());
- if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
- !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
- args.add("--log_topic");
- args.add(instanceConfig.getFunctionDetails().getLogTopic());
- }
- args.add("--auto_ack");
- if (instanceConfig.getFunctionDetails().getAutoAck()) {
- args.add("true");
- } else {
- args.add("false");
- }
+ args.add("--function_details");
+
args.add(JsonFormat.printer().print(instanceConfig.getFunctionDetails()));
- args.add("--processing_guarantees");
-
args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
args.add("--pulsar_serviceurl");
args.add(pulsarServiceUrl);
if (authConfig != null) {
@@ -169,73 +151,10 @@ class ProcessRuntime implements Runtime {
}
args.add("--max_buffered_tuples");
args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
- String userConfig =
instanceConfig.getFunctionDetails().getUserConfig();
- if (userConfig != null && !userConfig.isEmpty()) {
- args.add("--user_config");
- args.add(userConfig);
- }
+
args.add("--port");
args.add(String.valueOf(instanceConfig.getPort()));
- // source related configs
- if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
- if
(!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) {
- args.add("--source_classname");
-
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
- }
- String sourceConfigs =
instanceConfig.getFunctionDetails().getSource().getConfigs();
- if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
- args.add("--source_configs");
- args.add(sourceConfigs);
- }
- if
(instanceConfig.getFunctionDetails().getSource().getTypeClassName() != null
- &&
!instanceConfig.getFunctionDetails().getSource().getTypeClassName().isEmpty()) {
- args.add("--source_type_classname");
- args.add("\"" +
instanceConfig.getFunctionDetails().getSource().getTypeClassName() + "\"");
- }
- }
-
- if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() >
0) {
- args.add("--source_timeout_ms");
-
args.add(String.valueOf(instanceConfig.getFunctionDetails().getSource().getTimeoutMs()));
- }
- args.add("--source_subscription_type");
-
args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString());
- args.add("--source_topics_serde_classname");
- args.add(new
Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
- if
(isNotBlank(instanceConfig.getFunctionDetails().getSource().getTopicsPattern()))
{
- args.add("--topics_pattern");
-
args.add(instanceConfig.getFunctionDetails().getSource().getTopicsPattern());
- }
-
- // sink related configs
- if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
- if
(!instanceConfig.getFunctionDetails().getSink().getClassName().isEmpty()) {
- args.add("--sink_classname");
-
args.add(instanceConfig.getFunctionDetails().getSink().getClassName());
- }
- String sinkConfigs =
instanceConfig.getFunctionDetails().getSink().getConfigs();
- if (sinkConfigs != null && !sinkConfigs.isEmpty()) {
- args.add("--sink_configs");
- args.add(sinkConfigs);
- }
- if
(instanceConfig.getFunctionDetails().getSink().getTypeClassName() != null
- &&
!instanceConfig.getFunctionDetails().getSink().getTypeClassName().isEmpty()) {
- args.add("--sink_type_classname");
- args.add("\"" +
instanceConfig.getFunctionDetails().getSink().getTypeClassName() + "\"");
- }
- }
- if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
- &&
!instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
- args.add("--sink_topic");
- args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
- }
- if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName()
!= null
- &&
!instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
- args.add("--sink_serde_classname");
-
args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
- }
-
// state storage configs
if (null != stateStorageServiceUrl
&& instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7e2187a..b16e88d 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
}
@Override
- public ProcessRuntime createContainer(InstanceConfig instanceConfig,
String codeFile) {
+ public ProcessRuntime createContainer(InstanceConfig instanceConfig,
String codeFile) throws Exception {
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 8ffb582..0fe1f9f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,7 @@ public interface RuntimeFactory extends AutoCloseable {
* @return function container to start/stop instance
*/
Runtime createContainer(
- InstanceConfig instanceConfig, String codeFile);
+ InstanceConfig instanceConfig, String codeFile) throws Exception;
@Override
void close();
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index d9cef64..069b99f 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,13 +19,12 @@
package org.apache.pulsar.functions.runtime;
-import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -113,12 +112,12 @@ public class ProcessRuntimeTest {
}
@Test
- public void testJavaConstructor() {
+ public void testJavaConstructor() throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 56);
+ assertEquals(args.size(), 26);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" +
javaInstanceJarFile
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -126,53 +125,29 @@ public class ProcessRuntimeTest {
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " +
config.getFunctionId()
- + " --function_version " + config.getFunctionVersion() + "
--tenant " + config.getFunctionDetails().getTenant()
- + " --namespace " + config.getFunctionDetails().getNamespace()
- + " --name " + config.getFunctionDetails().getName()
- + " --function_classname " +
config.getFunctionDetails().getClassName()
- + " --log_topic " + config.getFunctionDetails().getLogTopic()
- + " --auto_ack false"
- + " --processing_guarantees ATLEAST_ONCE"
+ + " --function_version " + config.getFunctionVersion()
+ + " --function_details " +
JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(35)
- + " --source_classname " +
config.getFunctionDetails().getSource().getClassName()
- + " --source_type_classname \"" +
config.getFunctionDetails().getSource().getTypeClassName() + "\""
- + " --source_subscription_type " +
config.getFunctionDetails().getSource().getSubscriptionType().name()
- + " --source_topics_serde_classname " + new
Gson().toJson(topicsToSerDeClassName)
- + " --topics_pattern " +
config.getFunctionDetails().getSource().getTopicsPattern()
- + " --sink_classname " +
config.getFunctionDetails().getSink().getClassName()
- + " --sink_type_classname \"" +
config.getFunctionDetails().getSink().getTypeClassName() + "\""
- + " --sink_topic " +
config.getFunctionDetails().getSink().getTopic()
- + " --sink_serde_classname " +
config.getFunctionDetails().getSink().getSerDeClassName()
+ + " --max_buffered_tuples 1024 --port " + args.get(23)
+ " --state_storage_serviceurl " + stateStorageServiceUrl;
assertEquals(expectedArgs, String.join(" ", args));
}
@Test
- public void testPythonConstructor() {
+ public void testPythonConstructor() throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 44);
+ assertEquals(args.size(), 22);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " +
config.getFunctionDetails().getName() + " --instance_id "
+ config.getInstanceId() + " --function_id " +
config.getFunctionId()
- + " --function_version " + config.getFunctionVersion() + "
--tenant " + config.getFunctionDetails().getTenant()
- + " --namespace " + config.getFunctionDetails().getNamespace()
- + " --name " + config.getFunctionDetails().getName()
- + " --function_classname " +
config.getFunctionDetails().getClassName()
- + " --log_topic " + config.getFunctionDetails().getLogTopic()
- + " --auto_ack false"
- + " --processing_guarantees ATLEAST_ONCE"
+ + " --function_version " + config.getFunctionVersion()
+ + " --function_details " +
JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(33)
- + " --source_subscription_type " +
config.getFunctionDetails().getSource().getSubscriptionType().name()
- + " --source_topics_serde_classname " + new
Gson().toJson(topicsToSerDeClassName)
- + " --topics_pattern " +
config.getFunctionDetails().getSource().getTopicsPattern()
- + " --sink_topic " +
config.getFunctionDetails().getSink().getTopic()
- + " --sink_serde_classname " +
config.getFunctionDetails().getSink().getSerDeClassName();
+ + " --max_buffered_tuples 1024 --port " + args.get(21);
assertEquals(expectedArgs, String.join(" ", args));
}