This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b0a4018  Simplify argument passing in process runtimes (#2383)
b0a4018 is described below

commit b0a4018e09f5476934f9bf8b674178b1f6bbd7f7
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Aug 16 11:04:03 2018 -0700

    Simplify argument passing in process runtimes (#2383)
    
    ### Motivation
    
    Currently the process runtime goes over each variable inside the 
functiondetails protobuf and passes them as individual arguments. This is not 
only complicated, but each new variable addition/deletion results in changes to 
processruntime. This pr passes the entire proto as a json string which can then 
be converted back to protobuf in the instances, thereby simplifying this 
process.
---
 .../instance/src/main/python/Function_pb2.py       |  79 ++++++++----
 .../src/main/python/InstanceCommunication_pb2.py   | 137 ++++++++++++++++++---
 .../instance/src/main/python/python_instance.py    |   6 +-
 .../src/main/python/python_instance_main.py        |  63 ++--------
 .../pulsar/functions/runtime/JavaInstanceMain.java | 109 +---------------
 .../pulsar/functions/runtime/ProcessRuntime.java   |  93 +-------------
 .../functions/runtime/ProcessRuntimeFactory.java   |   2 +-
 .../pulsar/functions/runtime/RuntimeFactory.java   |   2 +-
 .../functions/runtime/ProcessRuntimeTest.java      |  47 ++-----
 9 files changed, 210 insertions(+), 328 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index c9513c5..61c87f4 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"\xa9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1244,
-  serialized_end=1323,
+  serialized_start=1347,
+  serialized_end=1426,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1325,
-  serialized_end=1369,
+  serialized_start=1428,
+  serialized_end=1472,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -116,8 +116,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=453,
-  serialized_end=484,
+  serialized_start=473,
+  serialized_end=504,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -265,6 +265,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='packageUrl', full_name='proto.FunctionDetails.packageUrl', 
index=13,
+      number=14, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -279,7 +286,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=79,
-  serialized_end=484,
+  serialized_end=504,
 )
 
 
@@ -316,8 +323,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=709,
-  serialized_end=770,
+  serialized_start=795,
+  serialized_end=856,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -369,6 +376,27 @@ _SOURCESPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', 
index=6,
+      number=7, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='builtin', full_name='proto.SourceSpec.builtin', index=7,
+      number=8, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', 
index=8,
+      number=9, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -381,8 +409,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=487,
-  serialized_end=770,
+  serialized_start=507,
+  serialized_end=856,
 )
 
 
@@ -428,6 +456,13 @@ _SINKSPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='builtin', full_name='proto.SinkSpec.builtin', index=5,
+      number=6, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -440,8 +475,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=772,
-  serialized_end=880,
+  serialized_start=858,
+  serialized_end=983,
 )
 
 
@@ -471,8 +506,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=882,
-  serialized_end=928,
+  serialized_start=985,
+  serialized_end=1031,
 )
 
 
@@ -523,8 +558,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=931,
-  serialized_end=1092,
+  serialized_start=1034,
+  serialized_end=1195,
 )
 
 
@@ -561,8 +596,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1094,
-  serialized_end=1175,
+  serialized_start=1197,
+  serialized_end=1278,
 )
 
 
@@ -599,8 +634,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1177,
-  serialized_end=1242,
+  serialized_start=1280,
+  serialized_end=1345,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index a16399e..f33f4f4 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd8\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -79,8 +79,8 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=643,
-  serialized_end=712,
+  serialized_start=661,
+  serialized_end=730,
 )
 
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@ _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=714,
-  serialized_end=778,
+  serialized_start=732,
+  serialized_end=796,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -232,6 +232,13 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='workerId', full_name='proto.FunctionStatus.workerId', index=15,
+      number=16, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -245,7 +252,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=778,
+  serialized_end=796,
 )
 
 
@@ -275,8 +282,8 @@ _FUNCTIONSTATUSLIST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=780,
-  serialized_end=851,
+  serialized_start=798,
+  serialized_end=869,
 )
 
 
@@ -327,8 +334,8 @@ _METRICSDATA_DATADIGEST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=919,
-  serialized_end=985,
+  serialized_start=937,
+  serialized_end=1003,
 )
 
 _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
@@ -364,8 +371,8 @@ _METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=987,
-  serialized_end=1064,
+  serialized_start=1005,
+  serialized_end=1082,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -394,8 +401,8 @@ _METRICSDATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=854,
-  serialized_end=1064,
+  serialized_start=872,
+  serialized_end=1082,
 )
 
 
@@ -425,8 +432,83 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1066,
-  serialized_end=1102,
+  serialized_start=1084,
+  serialized_end=1120,
+)
+
+
+_METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
+  name='InstanceMetrics',
+  full_name='proto.Metrics.InstanceMetrics',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='name', full_name='proto.Metrics.InstanceMetrics.name', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='instanceId', full_name='proto.Metrics.InstanceMetrics.instanceId', 
index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='metricsData', 
full_name='proto.Metrics.InstanceMetrics.metricsData', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1183,
+  serialized_end=1275,
+)
+
+_METRICS = _descriptor.Descriptor(
+  name='Metrics',
+  full_name='proto.Metrics',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='metrics', full_name='proto.Metrics.metrics', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[_METRICS_INSTANCEMETRICS, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1123,
+  serialized_end=1275,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -440,10 +522,14 @@ _METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
 _METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = 
_METRICSDATA_DATADIGEST
 _METRICSDATA_METRICSENTRY.containing_type = _METRICSDATA
 _METRICSDATA.fields_by_name['metrics'].message_type = _METRICSDATA_METRICSENTRY
+_METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = 
_METRICSDATA
+_METRICS_INSTANCEMETRICS.containing_type = _METRICS
+_METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
 DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
 DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
 DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
 DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
+DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
 FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', 
(_message.Message,), dict(
@@ -506,6 +592,21 @@ HealthCheckResult = 
_reflection.GeneratedProtocolMessageType('HealthCheckResult'
   ))
 _sym_db.RegisterMessage(HealthCheckResult)
 
+Metrics = _reflection.GeneratedProtocolMessageType('Metrics', 
(_message.Message,), dict(
+
+  InstanceMetrics = 
_reflection.GeneratedProtocolMessageType('InstanceMetrics', 
(_message.Message,), dict(
+    DESCRIPTOR = _METRICS_INSTANCEMETRICS,
+    __module__ = 'InstanceCommunication_pb2'
+    # @@protoc_insertion_point(class_scope:proto.Metrics.InstanceMetrics)
+    ))
+  ,
+  DESCRIPTOR = _METRICS,
+  __module__ = 'InstanceCommunication_pb2'
+  # @@protoc_insertion_point(class_scope:proto.Metrics)
+  ))
+_sym_db.RegisterMessage(Metrics)
+_sym_db.RegisterMessage(Metrics.InstanceMetrics)
+
 
 DESCRIPTOR.has_options = True
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), 
_b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
@@ -520,8 +621,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1105,
-  serialized_end=1453,
+  serialized_start=1278,
+  serialized_end=1626,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index bb16253..2fc5091 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,13 +114,13 @@ class Stats(object):
     
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, user_code, log_topic, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, user_code, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = Queue.Queue(max_buffered_tuples)
     self.log_topic_handler = None
-    if log_topic is not None:
-      self.log_topic_handler = log.LogTopicHandler(str(log_topic), 
pulsar_client)
+    if function_details.logTopic is not None and function_details.logTopic != 
"":
+      self.log_topic_handler = 
log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
     self.pulsar_client = pulsar_client
     self.input_serdes = {}
     self.consumers = {}
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index a3c1179..514d7fe 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -28,7 +28,6 @@ import os
 import sys
 import signal
 import time
-import json
 
 from pulsar import Authentication
 import pulsar
@@ -38,6 +37,7 @@ import log
 import server
 import python_instance
 import util
+from google.protobuf import json_format
 
 to_run = True
 Log = log.Log
@@ -54,15 +54,11 @@ def main():
   signal.signal(signal.SIGINT, atexit_function)
 
   parser = argparse.ArgumentParser(description='Pulsar Functions Python 
Instance')
-  parser.add_argument('--function_classname', required=True, help='Function 
Class Name')
+  parser.add_argument('--function_details', required=True, help='Function 
Details Json String')
   parser.add_argument('--py', required=True, help='Full Path of Function Code 
File')
-  parser.add_argument('--name', required=True, help='Function Name')
-  parser.add_argument('--tenant', required=True, help='Tenant Name')
-  parser.add_argument('--namespace', required=True, help='Namespace name')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function 
Version')
-  parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
   parser.add_argument('--client_auth_plugin', required=False, help='Client 
authentication plugin')
   parser.add_argument('--client_auth_params', required=False, help='Client 
authentication params')
@@ -72,64 +68,22 @@ def main():
   parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust 
cert file path')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
-  parser.add_argument('--user_config', required=False, help='User Config')
   parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
-  parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
-  parser.add_argument('--log_topic', required=False, help='Topic to send Log 
Messages')
-  parser.add_argument('--source_subscription_type', required=True, 
help='Subscription Type')
-  parser.add_argument('--source_topics_serde_classname', required=True, 
help='A mapping of Input topics to SerDe')
-  parser.add_argument('--topics_pattern', required=False, help='TopicsPattern 
to consume from list of topics under a namespace that match the pattern (not 
supported)')
-  parser.add_argument('--source_timeout_ms', required=False, help='Source 
message timeout in milliseconds')
-  parser.add_argument('--sink_topic', required=False, help='Sink Topic')
-  parser.add_argument('--sink_serde_classname', required=False, help='Sink 
SerDe classname')
 
   args = parser.parse_args()
+  function_details = Function_pb2.FunctionDetails()
+  json_format.Parse(args.function_details, function_details)
   log_file = os.path.join(args.logging_directory,
-                          util.getFullyQualifiedFunctionName(args.tenant, 
args.namespace, args.name),
+                          
util.getFullyQualifiedFunctionName(function_details.tenant, 
function_details.namespace, function_details.name),
                           "%s-%s.log" % (args.logging_file, args.instance_id))
   log.init_rotating_logger(level=logging.INFO, logfile=log_file,
                            max_files=5, max_bytes=10 * 1024 * 1024)
 
   Log.info("Starting Python instance with %s" % str(args))
 
-  function_details = Function_pb2.FunctionDetails()
-  function_details.tenant = args.tenant
-  function_details.namespace = args.namespace
-  function_details.name = args.name
-  function_details.className = args.function_classname
-
-  if args.topics_pattern:
-    raise ValueError('topics_pattern is not supported by python client') 
-  sourceSpec = Function_pb2.SourceSpec()
-  sourceSpec.subscriptionType = 
Function_pb2.SubscriptionType.Value(args.source_subscription_type)
-  try:
-    source_topics_serde_classname_dict = 
json.loads(args.source_topics_serde_classname)
-  except ValueError:
-    Log.critical("Cannot decode source_topics_serde_classname.  This argument 
must be specifed as a JSON")
-    sys.exit(1)
-  if not source_topics_serde_classname_dict:
-    Log.critical("source_topics_serde_classname cannot be empty")
-  for topics, serde_classname in source_topics_serde_classname_dict.items():
-    sourceSpec.topicsToSerDeClassName[topics] = serde_classname
-  if args.source_timeout_ms:
-    sourceSpec.timeoutMs = long(args.source_timeout_ms)
-  function_details.source.MergeFrom(sourceSpec)
-
-  sinkSpec = Function_pb2.SinkSpec()
-  if args.sink_topic != None and len(args.sink_topic) != 0:
-    sinkSpec.topic = args.sink_topic
-  if args.sink_serde_classname != None and len(args.sink_serde_classname) != 0:
-    sinkSpec.serDeClassName = args.sink_serde_classname
-  function_details.sink.MergeFrom(sinkSpec)
-
-  function_details.processingGuarantees = 
Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
-  if args.auto_ack == "true":
-    function_details.autoAck = True
-  else:
-    function_details.autoAck = False
-  if args.user_config != None and len(args.user_config) != 0:
-    function_details.userConfig = args.user_config
+  if function_details.source.topicsPattern:
+    raise ValueError('topicsPattern is not supported by python client')
 
   authentication = None
   use_tls = False
@@ -146,8 +100,7 @@ def main():
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 
1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), 
str(args.function_id),
                                               str(args.function_version), 
function_details,
-                                              int(args.max_buffered_tuples), 
str(args.py),
-                                              args.log_topic, pulsar_client)
+                                              int(args.max_buffered_tuples), 
str(args.py), pulsar_client)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 966740d..8924d94 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
@@ -53,24 +54,13 @@ import java.util.concurrent.TimeUnit;
  */
 @Slf4j
 public class JavaInstanceMain implements AutoCloseable {
-    @Parameter(names = "--function_classname", description = "Function Class 
Name\n", required = true)
-    protected String className;
+    @Parameter(names = "--function_details", description = "Function details 
json\n", required = true)
+    protected String functionDetailsJsonString;
     @Parameter(
             names = "--jar",
             description = "Path to Jar\n",
             listConverter = StringConverter.class)
     protected String jarFile;
-    @Parameter(names = "--name", description = "Function Name\n", required = 
true)
-    protected String functionName;
-    @Parameter(names = "--tenant", description = "Tenant Name\n", required = 
true)
-    protected String tenant;
-    @Parameter(names = "--namespace", description = "Namespace Name\n", 
required = true)
-    protected String namespace;
-    @Parameter(names = "--log_topic", description = "Log Topic")
-    protected String logTopic;
-
-    @Parameter(names = "--processing_guarantees", description = "Processing 
Guarantees\n", required = true)
-    protected ProcessingGuarantees processingGuarantees;
 
     @Parameter(names = "--instance_id", description = "Instance Id\n", 
required = true)
     protected String instanceId;
@@ -111,48 +101,6 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number 
of tuples to buffer\n", required = true)
     protected int maxBufferedTuples;
 
-    @Parameter(names = "--user_config", description = "UserConfig\n")
-    protected String userConfig;
-
-    @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
-    protected String autoAck = Boolean.TRUE.toString();
-
-    @Parameter(names = "--source_classname", description = "The source 
classname")
-    protected String sourceClassname;
-
-    @Parameter(names = "--source_configs", description = "The source configs")
-    protected String sourceConfigs;
-
-    @Parameter(names = "--source_type_classname", description = "The return 
type of the source", required = true)
-    protected String sourceTypeClassName;
-
-    @Parameter(names = "--source_subscription_type", description = "The source 
subscription type", required = true)
-    protected String sourceSubscriptionType;
-
-    @Parameter(names = "--source_topics_serde_classname", description = "A map 
of topics to SerDe for the source")
-    protected String sourceTopicsSerdeClassName;
-    
-    @Parameter(names = "--topics_pattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs")
-    protected String topicsPattern;
-
-    @Parameter(names = "--source_timeout_ms", description = "Source message 
timeout in milliseconds")
-    protected Long sourceTimeoutMs;
-
-    @Parameter(names = "--sink_type_classname", description = "The injest type 
of the sink", required = true)
-    protected String sinkTypeClassName;
-
-    @Parameter(names = "--sink_configs", description = "The sink configs\n")
-    protected String sinkConfigs;
-
-    @Parameter(names = "--sink_classname", description = "The sink 
classname\n")
-    protected String sinkClassname;
-
-    @Parameter(names = "--sink_topic", description = "The sink Topic Name\n")
-    protected String sinkTopic;
-
-    @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
-    protected String sinkSerdeClassName;
-
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -169,56 +117,7 @@ public class JavaInstanceMain implements AutoCloseable {
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-        functionDetailsBuilder.setTenant(tenant);
-        functionDetailsBuilder.setNamespace(namespace);
-        functionDetailsBuilder.setName(functionName);
-        functionDetailsBuilder.setClassName(className);
-
-        if (logTopic != null) {
-            functionDetailsBuilder.setLogTopic(logTopic);
-        }
-        functionDetailsBuilder.setProcessingGuarantees(processingGuarantees);
-        functionDetailsBuilder.setAutoAck(isTrue(autoAck));
-        if (userConfig != null && !userConfig.isEmpty()) {
-            functionDetailsBuilder.setUserConfig(userConfig);
-        }
-
-        // Setup source
-        SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
-        if (sourceClassname != null) {
-            sourceDetailsBuilder.setClassName(sourceClassname);
-        }
-        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
-            sourceDetailsBuilder.setConfigs(sourceConfigs);
-        }
-        
sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType));
-        sourceDetailsBuilder.putAllTopicsToSerDeClassName(new 
Gson().fromJson(sourceTopicsSerdeClassName, Map.class));
-        if (isNotBlank(topicsPattern)) {
-            sourceDetailsBuilder.setTopicsPattern(topicsPattern);
-        }
-        sourceDetailsBuilder.setTypeClassName(sourceTypeClassName);
-        if (sourceTimeoutMs != null) {
-            sourceDetailsBuilder.setTimeoutMs(sourceTimeoutMs);
-        }
-        functionDetailsBuilder.setSource(sourceDetailsBuilder);
-
-        // Setup sink
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        if (sinkClassname != null) {
-            sinkSpecBuilder.setClassName(sinkClassname);
-        }
-        if (sinkConfigs != null) {
-            sinkSpecBuilder.setConfigs(sinkConfigs);
-        }
-        if (sinkSerdeClassName != null) {
-            sinkSpecBuilder.setSerDeClassName(sinkSerdeClassName);
-        }
-        sinkSpecBuilder.setTypeClassName(sinkTypeClassName);
-        if (sinkTopic != null && !sinkTopic.isEmpty()) {
-            sinkSpecBuilder.setTopic(sinkTopic);
-        }
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
-
+        JsonFormat.parser().merge(functionDetailsJsonString, 
functionDetailsBuilder);
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index f3ed170..3e34661 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
@@ -72,7 +73,7 @@ class ProcessRuntime implements Runtime {
                    String codeFile,
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
-                   AuthenticationConfig authConfig) {
+                   AuthenticationConfig authConfig) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.processArgs = composeArgs(instanceConfig, instanceFile, 
logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
@@ -85,7 +86,7 @@ class ProcessRuntime implements Runtime {
                                      String codeFile,
                                      String pulsarServiceUrl,
                                      String stateStorageServiceUrl,
-                                     AuthenticationConfig authConfig) {
+                                     AuthenticationConfig authConfig) throws 
Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -124,28 +125,9 @@ class ProcessRuntime implements Runtime {
         args.add(instanceConfig.getFunctionId());
         args.add("--function_version");
         args.add(instanceConfig.getFunctionVersion());
-        args.add("--tenant");
-        args.add(instanceConfig.getFunctionDetails().getTenant());
-        args.add("--namespace");
-        args.add(instanceConfig.getFunctionDetails().getNamespace());
-        args.add("--name");
-        args.add(instanceConfig.getFunctionDetails().getName());
-        args.add("--function_classname");
-        args.add(instanceConfig.getFunctionDetails().getClassName());
-        if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
-                !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
-            args.add("--log_topic");
-            args.add(instanceConfig.getFunctionDetails().getLogTopic());
-        }
-        args.add("--auto_ack");
-        if (instanceConfig.getFunctionDetails().getAutoAck()) {
-            args.add("true");
-        } else {
-            args.add("false");
-        }
+        args.add("--function_details");
+        
args.add(JsonFormat.printer().print(instanceConfig.getFunctionDetails()));
 
-        args.add("--processing_guarantees");
-        
args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
         args.add("--pulsar_serviceurl");
         args.add(pulsarServiceUrl);
         if (authConfig != null) {
@@ -169,73 +151,10 @@ class ProcessRuntime implements Runtime {
         }
         args.add("--max_buffered_tuples");
         args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-        String userConfig = 
instanceConfig.getFunctionDetails().getUserConfig();
-        if (userConfig != null && !userConfig.isEmpty()) {
-            args.add("--user_config");
-            args.add(userConfig);
-        }
+
         args.add("--port");
         args.add(String.valueOf(instanceConfig.getPort()));
 
-        // source related configs
-        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
-            if 
(!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) {
-                args.add("--source_classname");
-                
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
-            }
-            String sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigs();
-            if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
-                args.add("--source_configs");
-                args.add(sourceConfigs);
-            }
-            if 
(instanceConfig.getFunctionDetails().getSource().getTypeClassName() != null
-                && 
!instanceConfig.getFunctionDetails().getSource().getTypeClassName().isEmpty()) {
-                args.add("--source_type_classname");
-                args.add("\"" + 
instanceConfig.getFunctionDetails().getSource().getTypeClassName() + "\"");
-            }
-        }
-
-        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() > 
0) {
-            args.add("--source_timeout_ms");
-            
args.add(String.valueOf(instanceConfig.getFunctionDetails().getSource().getTimeoutMs()));
-        }
-        args.add("--source_subscription_type");
-        
args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString());
-        args.add("--source_topics_serde_classname");
-        args.add(new 
Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
-        if 
(isNotBlank(instanceConfig.getFunctionDetails().getSource().getTopicsPattern()))
 {
-            args.add("--topics_pattern");
-            
args.add(instanceConfig.getFunctionDetails().getSource().getTopicsPattern());
-        }
-
-        // sink related configs
-        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
-            if 
(!instanceConfig.getFunctionDetails().getSink().getClassName().isEmpty()) {
-                args.add("--sink_classname");
-                
args.add(instanceConfig.getFunctionDetails().getSink().getClassName());
-            }
-            String sinkConfigs = 
instanceConfig.getFunctionDetails().getSink().getConfigs();
-            if (sinkConfigs != null && !sinkConfigs.isEmpty()) {
-                args.add("--sink_configs");
-                args.add(sinkConfigs);
-            }
-            if 
(instanceConfig.getFunctionDetails().getSink().getTypeClassName() != null
-                    && 
!instanceConfig.getFunctionDetails().getSink().getTypeClassName().isEmpty()) {
-                args.add("--sink_type_classname");
-                args.add("\"" + 
instanceConfig.getFunctionDetails().getSink().getTypeClassName() + "\"");
-            }
-        }
-        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
-                && 
!instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
-            args.add("--sink_topic");
-            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
-        }
-        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() 
!= null
-                && 
!instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
-            args.add("--sink_serde_classname");
-            
args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
-        }
-
         // state storage configs
         if (null != stateStorageServiceUrl
             && instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7e2187a..b16e88d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     }
 
     @Override
-    public ProcessRuntime createContainer(InstanceConfig instanceConfig, 
String codeFile) {
+    public ProcessRuntime createContainer(InstanceConfig instanceConfig, 
String codeFile) throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 8ffb582..0fe1f9f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,7 @@ public interface RuntimeFactory extends AutoCloseable {
      * @return function container to start/stop instance
      */
     Runtime createContainer(
-            InstanceConfig instanceConfig, String codeFile);
+            InstanceConfig instanceConfig, String codeFile) throws Exception;
 
     @Override
     void close();
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index d9cef64..069b99f 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,13 +19,12 @@
 
 package org.apache.pulsar.functions.runtime;
 
-import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -113,12 +112,12 @@ public class ProcessRuntimeTest {
     }
 
     @Test
-    public void testJavaConstructor() {
+    public void testJavaConstructor() throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 56);
+        assertEquals(args.size(), 26);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + 
javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -126,53 +125,29 @@ public class ProcessRuntimeTest {
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
                 + " --jar " + userJarFile + " --instance_id "
                 + config.getInstanceId() + " --function_id " + 
config.getFunctionId()
-                + " --function_version " + config.getFunctionVersion() + " 
--tenant " + config.getFunctionDetails().getTenant()
-                + " --namespace " + config.getFunctionDetails().getNamespace()
-                + " --name " + config.getFunctionDetails().getName()
-                + " --function_classname " + 
config.getFunctionDetails().getClassName()
-                + " --log_topic " + config.getFunctionDetails().getLogTopic()
-                + " --auto_ack false"
-                + " --processing_guarantees ATLEAST_ONCE"
+                + " --function_version " + config.getFunctionVersion()
+                + " --function_details " + 
JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(35)
-                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName()
-                + " --source_type_classname \"" + 
config.getFunctionDetails().getSource().getTypeClassName() + "\""
-                + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName)
-                + " --topics_pattern " + 
config.getFunctionDetails().getSource().getTopicsPattern()
-                + " --sink_classname " + 
config.getFunctionDetails().getSink().getClassName()
-                + " --sink_type_classname \"" + 
config.getFunctionDetails().getSink().getTypeClassName() + "\""
-                + " --sink_topic " + 
config.getFunctionDetails().getSink().getTopic()
-                + " --sink_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName()
+                + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl;
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
     @Test
-    public void testPythonConstructor() {
+    public void testPythonConstructor() throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 44);
+        assertEquals(args.size(), 22);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + 
config.getFunctionDetails().getName() + " --instance_id "
                 + config.getInstanceId() + " --function_id " + 
config.getFunctionId()
-                + " --function_version " + config.getFunctionVersion() + " 
--tenant " + config.getFunctionDetails().getTenant()
-                + " --namespace " + config.getFunctionDetails().getNamespace()
-                + " --name " + config.getFunctionDetails().getName()
-                + " --function_classname " + 
config.getFunctionDetails().getClassName()
-                + " --log_topic " + config.getFunctionDetails().getLogTopic()
-                + " --auto_ack false"
-                + " --processing_guarantees ATLEAST_ONCE"
+                + " --function_version " + config.getFunctionVersion()
+                + " --function_details " + 
JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(33)
-                + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName)
-                + " --topics_pattern " + 
config.getFunctionDetails().getSource().getTopicsPattern()
-                + " --sink_topic " + 
config.getFunctionDetails().getSink().getTopic()
-                + " --sink_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName();
+                + " --max_buffered_tuples 1024 --port " + args.get(21);
         assertEquals(expectedArgs, String.join(" ", args));
     }
 

Reply via email to