This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 334dffd Support key base for python function (#8540)
334dffd is described below
commit 334dffdb95035349cc0fdbc9f84a15d2422a825b
Author: xiaolong ran <[email protected]>
AuthorDate: Sun Nov 22 09:56:45 2020 +0800
Support key base for python function (#8540)
### Motivation
The changing base on #8523
### Modifications
Support key_base for python function
---
.../instance/src/main/python/Function_pb2.py | 234 ++++++++++++----
.../src/main/python/InstanceCommunication_pb2.py | 306 +++++++++++++--------
.../main/python/InstanceCommunication_pb2_grpc.py | 286 ++++++++++++-------
.../instance/src/main/python/python_instance.py | 8 +
.../integration/functions/PulsarFunctionsTest.java | 3 +
5 files changed, 579 insertions(+), 258 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3ee58e9..14a33a1 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -38,7 +38,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
syntax='proto3',
serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
create_key=_descriptor._internal_create_key,
-
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02
\x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02
\x01(\t\"\xa2\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -66,8 +66,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3207,
- serialized_end=3286,
+ serialized_start=3685,
+ serialized_end=3764,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -97,8 +97,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3288,
- serialized_end=3348,
+ serialized_start=3766,
+ serialized_end=3826,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -123,8 +123,8 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3350,
- serialized_end=3398,
+ serialized_start=3828,
+ serialized_end=3876,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
@@ -149,8 +149,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3400,
- serialized_end=3441,
+ serialized_start=3878,
+ serialized_end=3919,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
@@ -192,8 +192,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=785,
- serialized_end=824,
+ serialized_start=844,
+ serialized_end=883,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
@@ -227,11 +227,46 @@ _FUNCTIONDETAILS_COMPONENTTYPE =
_descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=826,
- serialized_end=890,
+ serialized_start=885,
+ serialized_end=949,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
+_CRYPTOSPEC_FAILUREACTION = _descriptor.EnumDescriptor(
+ name='FailureAction',
+ full_name='proto.CryptoSpec.FailureAction',
+ filename=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='FAIL', index=0, number=0,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='DISCARD', index=1, number=1,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='CONSUME', index=2, number=2,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='SEND', index=3, number=10,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ ],
+ containing_type=None,
+ serialized_options=None,
+ serialized_start=1873,
+ serialized_end=1934,
+)
+_sym_db.RegisterEnumDescriptor(_CRYPTOSPEC_FAILUREACTION)
+
_RESOURCES = _descriptor.Descriptor(
name='Resources',
@@ -480,6 +515,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='subscriptionPosition',
full_name='proto.FunctionDetails.subscriptionPosition', index=22,
+ number=23, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -495,7 +537,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=147,
- serialized_end=890,
+ serialized_end=949,
)
@@ -526,8 +568,8 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1185,
- serialized_end=1219,
+ serialized_start=1283,
+ serialized_end=1317,
)
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
@@ -564,8 +606,8 @@ _CONSUMERSPEC_SCHEMAPROPERTIESENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1221,
- serialized_end=1276,
+ serialized_start=1319,
+ serialized_end=1374,
)
_CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -602,8 +644,8 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1278,
- serialized_end=1335,
+ serialized_start=1376,
+ serialized_end=1433,
)
_CONSUMERSPEC = _descriptor.Descriptor(
@@ -656,6 +698,13 @@ _CONSUMERSPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoSpec', full_name='proto.ConsumerSpec.cryptoSpec', index=6,
+ number=7, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -668,8 +717,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=893,
- serialized_end=1335,
+ serialized_start=952,
+ serialized_end=1433,
)
@@ -702,6 +751,20 @@ _PRODUCERSPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoSpec', full_name='proto.ProducerSpec.cryptoSpec', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='batchBuilder', full_name='proto.ProducerSpec.batchBuilder',
index=4,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -714,8 +777,69 @@ _PRODUCERSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1337,
- serialized_end=1456,
+ serialized_start=1436,
+ serialized_end=1616,
+)
+
+
+_CRYPTOSPEC = _descriptor.Descriptor(
+ name='CryptoSpec',
+ full_name='proto.CryptoSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='cryptoKeyReaderClassName',
full_name='proto.CryptoSpec.cryptoKeyReaderClassName', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoKeyReaderConfig',
full_name='proto.CryptoSpec.cryptoKeyReaderConfig', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='producerEncryptionKeyName',
full_name='proto.CryptoSpec.producerEncryptionKeyName', index=2,
+ number=3, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='producerCryptoFailureAction',
full_name='proto.CryptoSpec.producerCryptoFailureAction', index=3,
+ number=4, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='consumerCryptoFailureAction',
full_name='proto.CryptoSpec.consumerCryptoFailureAction', index=4,
+ number=5, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ _CRYPTOSPEC_FAILUREACTION,
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1619,
+ serialized_end=1934,
)
@@ -753,8 +877,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1936,
- serialized_end=1997,
+ serialized_start=2414,
+ serialized_end=2475,
)
_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -791,8 +915,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1999,
- serialized_end=2069,
+ serialized_start=2477,
+ serialized_end=2547,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -906,8 +1030,8 @@ _SOURCESPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1459,
- serialized_end=2069,
+ serialized_start=1937,
+ serialized_end=2547,
)
@@ -945,8 +1069,8 @@ _SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1221,
- serialized_end=1276,
+ serialized_start=1319,
+ serialized_end=1374,
)
_SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -983,8 +1107,8 @@ _SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1278,
- serialized_end=1335,
+ serialized_start=1376,
+ serialized_end=1433,
)
_SINKSPEC = _descriptor.Descriptor(
@@ -1084,8 +1208,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2072,
- serialized_end=2548,
+ serialized_start=2550,
+ serialized_end=3026,
)
@@ -1123,8 +1247,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2550,
- serialized_end=2622,
+ serialized_start=3028,
+ serialized_end=3100,
)
@@ -1162,8 +1286,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2918,
- serialized_end=2993,
+ serialized_start=3396,
+ serialized_end=3471,
)
_FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -1228,8 +1352,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2625,
- serialized_end=2993,
+ serialized_start=3103,
+ serialized_end=3471,
)
@@ -1267,8 +1391,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2995,
- serialized_end=3055,
+ serialized_start=3473,
+ serialized_end=3533,
)
@@ -1306,8 +1430,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3057,
- serialized_end=3138,
+ serialized_start=3535,
+ serialized_end=3616,
)
@@ -1345,8 +1469,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3140,
- serialized_end=3205,
+ serialized_start=3618,
+ serialized_end=3683,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type =
_PROCESSINGGUARANTEES
@@ -1356,6 +1480,7 @@ _FUNCTIONDETAILS.fields_by_name['sink'].message_type =
_SINKSPEC
_FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
_FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type =
_FUNCTIONDETAILS_COMPONENTTYPE
+_FUNCTIONDETAILS.fields_by_name['subscriptionPosition'].enum_type =
_SUBSCRIPTIONPOSITION
_FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
_FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
_CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
@@ -1364,6 +1489,11 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type =
_CONSUMERSPEC
_CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type =
_CONSUMERSPEC_RECEIVERQUEUESIZE
_CONSUMERSPEC.fields_by_name['schemaProperties'].message_type =
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY
_CONSUMERSPEC.fields_by_name['consumerProperties'].message_type =
_CONSUMERSPEC_CONSUMERPROPERTIESENTRY
+_CONSUMERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_PRODUCERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_CRYPTOSPEC.fields_by_name['producerCryptoFailureAction'].enum_type =
_CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC.fields_by_name['consumerCryptoFailureAction'].enum_type =
_CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC_FAILUREACTION.containing_type = _CRYPTOSPEC
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
_SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type =
_CONSUMERSPEC
_SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
@@ -1389,6 +1519,7 @@ DESCRIPTOR.message_types_by_name['RetryDetails'] =
_RETRYDETAILS
DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC
+DESCRIPTOR.message_types_by_name['CryptoSpec'] = _CRYPTOSPEC
DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] =
_PACKAGELOCATIONMETADATA
@@ -1461,6 +1592,13 @@ ProducerSpec =
_reflection.GeneratedProtocolMessageType('ProducerSpec', (_messag
})
_sym_db.RegisterMessage(ProducerSpec)
+CryptoSpec = _reflection.GeneratedProtocolMessageType('CryptoSpec',
(_message.Message,), {
+ 'DESCRIPTOR' : _CRYPTOSPEC,
+ '__module__' : 'Function_pb2'
+ # @@protoc_insertion_point(class_scope:proto.CryptoSpec)
+ })
+_sym_db.RegisterMessage(CryptoSpec)
+
SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec',
(_message.Message,), {
'TopicsToSerDeClassNameEntry' :
_reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry',
(_message.Message,), {
diff --git
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index d665e98..d1ba3bb 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -19,14 +19,11 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: InstanceCommunication.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
-from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -39,7 +36,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
-
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xf6\x03\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Exc [...]
+
serialized_options=b'\n!org.apache.pulsar.functions.protoB\025InstanceCommunication',
+ create_key=_descriptor._internal_create_key,
+
serialized_pb=b'\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc4\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12
\n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.Excep [...]
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
@@ -52,35 +51,36 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION =
_descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='exceptionString',
full_name='proto.FunctionStatus.ExceptionInformation.exceptionString', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='msSinceEpoch',
full_name='proto.FunctionStatus.ExceptionInformation.msSinceEpoch', index=1,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=501,
- serialized_end=570,
+ serialized_start=707,
+ serialized_end=776,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -89,6 +89,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='running', full_name='proto.FunctionStatus.running', index=0,
@@ -96,105 +97,172 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='failureException',
full_name='proto.FunctionStatus.failureException', index=1,
number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numRestarts', full_name='proto.FunctionStatus.numRestarts',
index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numReceived', full_name='proto.FunctionStatus.numReceived',
index=3,
number=17, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numSuccessfullyProcessed',
full_name='proto.FunctionStatus.numSuccessfullyProcessed', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numUserExceptions',
full_name='proto.FunctionStatus.numUserExceptions', index=5,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='latestUserExceptions',
full_name='proto.FunctionStatus.latestUserExceptions', index=6,
number=7, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numSystemExceptions',
full_name='proto.FunctionStatus.numSystemExceptions', index=7,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='latestSystemExceptions',
full_name='proto.FunctionStatus.latestSystemExceptions', index=8,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='numSourceExceptions',
full_name='proto.FunctionStatus.numSourceExceptions', index=9,
+ number=18, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='latestSourceExceptions',
full_name='proto.FunctionStatus.latestSourceExceptions', index=10,
+ number=19, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='averageLatency', full_name='proto.FunctionStatus.averageLatency',
index=9,
+ name='numSinkExceptions',
full_name='proto.FunctionStatus.numSinkExceptions', index=11,
+ number=20, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='latestSinkExceptions',
full_name='proto.FunctionStatus.latestSinkExceptions', index=12,
+ number=21, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='averageLatency', full_name='proto.FunctionStatus.averageLatency',
index=13,
number=12, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='lastInvocationTime',
full_name='proto.FunctionStatus.lastInvocationTime', index=10,
+ name='lastInvocationTime',
full_name='proto.FunctionStatus.lastInvocationTime', index=14,
number=13, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='instanceId', full_name='proto.FunctionStatus.instanceId', index=11,
+ name='instanceId', full_name='proto.FunctionStatus.instanceId', index=15,
number=14, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='workerId', full_name='proto.FunctionStatus.workerId', index=12,
+ name='workerId', full_name='proto.FunctionStatus.workerId', index=16,
number=16, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_FUNCTIONSTATUS_EXCEPTIONINFORMATION, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=68,
- serialized_end=570,
+ serialized_end=776,
+)
+
+
+_FUNCTIONSTATUSLIST = _descriptor.Descriptor(
+ name='FunctionStatusList',
+ full_name='proto.FunctionStatusList',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='error', full_name='proto.FunctionStatusList.error', index=0,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='functionStatusList',
full_name='proto.FunctionStatusList.functionStatusList', index=1,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=778,
+ serialized_end=864,
)
@@ -204,35 +272,36 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='proto.MetricsData.UserMetricsEntry.value',
index=1,
number=2, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(),
_b('8\001')),
+ serialized_options=b'8\001',
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=987,
- serialized_end=1037,
+ serialized_start=1281,
+ serialized_end=1331,
)
_METRICSDATA = _descriptor.Descriptor(
@@ -241,6 +310,7 @@ _METRICSDATA = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='receivedTotal', full_name='proto.MetricsData.receivedTotal',
index=0,
@@ -248,98 +318,98 @@ _METRICSDATA = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='receivedTotal_1min',
full_name='proto.MetricsData.receivedTotal_1min', index=1,
number=10, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='processedSuccessfullyTotal',
full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
number=4, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='processedSuccessfullyTotal_1min',
full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=3,
number=12, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='systemExceptionsTotal',
full_name='proto.MetricsData.systemExceptionsTotal', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='systemExceptionsTotal_1min',
full_name='proto.MetricsData.systemExceptionsTotal_1min', index=5,
number=13, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userExceptionsTotal',
full_name='proto.MetricsData.userExceptionsTotal', index=6,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userExceptionsTotal_1min',
full_name='proto.MetricsData.userExceptionsTotal_1min', index=7,
number=14, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='avgProcessLatency',
full_name='proto.MetricsData.avgProcessLatency', index=8,
number=7, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='avgProcessLatency_1min',
full_name='proto.MetricsData.avgProcessLatency_1min', index=9,
number=15, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='lastInvocation', full_name='proto.MetricsData.lastInvocation',
index=10,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_METRICSDATA_USERMETRICSENTRY, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=573,
- serialized_end=1037,
+ serialized_start=867,
+ serialized_end=1331,
)
@@ -349,6 +419,7 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='success', full_name='proto.HealthCheckResult.success', index=0,
@@ -356,21 +427,21 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1039,
- serialized_end=1075,
+ serialized_start=1333,
+ serialized_end=1369,
)
@@ -380,42 +451,43 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='proto.Metrics.InstanceMetrics.name', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='instanceId', full_name='proto.Metrics.InstanceMetrics.instanceId',
index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='metricsData',
full_name='proto.Metrics.InstanceMetrics.metricsData', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1138,
- serialized_end=1230,
+ serialized_start=1432,
+ serialized_end=1524,
)
_METRICS = _descriptor.Descriptor(
@@ -424,6 +496,7 @@ _METRICS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='metrics', full_name='proto.Metrics.metrics', index=0,
@@ -431,103 +504,113 @@ _METRICS = _descriptor.Descriptor(
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_METRICS_INSTANCEMETRICS, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1078,
- serialized_end=1230,
+ serialized_start=1372,
+ serialized_end=1524,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSourceExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSinkExceptions'].message_type =
_FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type =
_FUNCTIONSTATUS
_METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA
_METRICSDATA.fields_by_name['userMetrics'].message_type =
_METRICSDATA_USERMETRICSENTRY
_METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type =
_METRICSDATA
_METRICS_INSTANCEMETRICS.containing_type = _METRICS
_METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
+DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus',
(_message.Message,), dict(
+FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus',
(_message.Message,), {
- ExceptionInformation =
_reflection.GeneratedProtocolMessageType('ExceptionInformation',
(_message.Message,), dict(
- DESCRIPTOR = _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
- __module__ = 'InstanceCommunication_pb2'
+ 'ExceptionInformation' :
_reflection.GeneratedProtocolMessageType('ExceptionInformation',
(_message.Message,), {
+ 'DESCRIPTOR' : _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
+ '__module__' : 'InstanceCommunication_pb2'
#
@@protoc_insertion_point(class_scope:proto.FunctionStatus.ExceptionInformation)
- ))
+ })
,
- DESCRIPTOR = _FUNCTIONSTATUS,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _FUNCTIONSTATUS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionStatus)
- ))
+ })
_sym_db.RegisterMessage(FunctionStatus)
_sym_db.RegisterMessage(FunctionStatus.ExceptionInformation)
-MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData',
(_message.Message,), dict(
+FunctionStatusList =
_reflection.GeneratedProtocolMessageType('FunctionStatusList',
(_message.Message,), {
+ 'DESCRIPTOR' : _FUNCTIONSTATUSLIST,
+ '__module__' : 'InstanceCommunication_pb2'
+ # @@protoc_insertion_point(class_scope:proto.FunctionStatusList)
+ })
+_sym_db.RegisterMessage(FunctionStatusList)
+
+MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData',
(_message.Message,), {
- UserMetricsEntry =
_reflection.GeneratedProtocolMessageType('UserMetricsEntry',
(_message.Message,), dict(
- DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY,
- __module__ = 'InstanceCommunication_pb2'
+ 'UserMetricsEntry' :
_reflection.GeneratedProtocolMessageType('UserMetricsEntry',
(_message.Message,), {
+ 'DESCRIPTOR' : _METRICSDATA_USERMETRICSENTRY,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry)
- ))
+ })
,
- DESCRIPTOR = _METRICSDATA,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _METRICSDATA,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData)
- ))
+ })
_sym_db.RegisterMessage(MetricsData)
_sym_db.RegisterMessage(MetricsData.UserMetricsEntry)
-HealthCheckResult =
_reflection.GeneratedProtocolMessageType('HealthCheckResult',
(_message.Message,), dict(
- DESCRIPTOR = _HEALTHCHECKRESULT,
- __module__ = 'InstanceCommunication_pb2'
+HealthCheckResult =
_reflection.GeneratedProtocolMessageType('HealthCheckResult',
(_message.Message,), {
+ 'DESCRIPTOR' : _HEALTHCHECKRESULT,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.HealthCheckResult)
- ))
+ })
_sym_db.RegisterMessage(HealthCheckResult)
-Metrics = _reflection.GeneratedProtocolMessageType('Metrics',
(_message.Message,), dict(
+Metrics = _reflection.GeneratedProtocolMessageType('Metrics',
(_message.Message,), {
- InstanceMetrics =
_reflection.GeneratedProtocolMessageType('InstanceMetrics',
(_message.Message,), dict(
- DESCRIPTOR = _METRICS_INSTANCEMETRICS,
- __module__ = 'InstanceCommunication_pb2'
+ 'InstanceMetrics' :
_reflection.GeneratedProtocolMessageType('InstanceMetrics',
(_message.Message,), {
+ 'DESCRIPTOR' : _METRICS_INSTANCEMETRICS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.Metrics.InstanceMetrics)
- ))
+ })
,
- DESCRIPTOR = _METRICS,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _METRICS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.Metrics)
- ))
+ })
_sym_db.RegisterMessage(Metrics)
_sym_db.RegisterMessage(Metrics.InstanceMetrics)
-DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(),
_b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
-_METRICSDATA_USERMETRICSENTRY.has_options = True
-_METRICSDATA_USERMETRICSENTRY._options =
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+DESCRIPTOR._options = None
+_METRICSDATA_USERMETRICSENTRY._options = None
_INSTANCECONTROL = _descriptor.ServiceDescriptor(
name='InstanceControl',
full_name='proto.InstanceControl',
file=DESCRIPTOR,
index=0,
- options=None,
- serialized_start=1233,
- serialized_end=1581,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
+ serialized_start=1527,
+ serialized_end=1875,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
@@ -536,7 +619,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_FUNCTIONSTATUS,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='GetAndResetMetrics',
@@ -545,7 +629,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_METRICSDATA,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='ResetMetrics',
@@ -554,7 +639,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='GetMetrics',
@@ -563,7 +649,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_METRICSDATA,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='HealthCheck',
@@ -572,7 +659,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_HEALTHCHECKRESULT,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_INSTANCECONTROL)
diff --git
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index 21730e1..e895f53 100644
---
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -18,6 +18,7 @@
#
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import InstanceCommunication_pb2 as InstanceCommunication__pb2
@@ -25,110 +26,193 @@ from google.protobuf import empty_pb2 as
google_dot_protobuf_dot_empty__pb2
class InstanceControlStub(object):
- # missing associated documentation comment in .proto file
- pass
-
- def __init__(self, channel):
- """Constructor.
-
- Args:
- channel: A grpc.Channel.
- """
- self.GetFunctionStatus = channel.unary_unary(
- '/proto.InstanceControl/GetFunctionStatus',
-
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-
response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
- )
- self.GetAndResetMetrics = channel.unary_unary(
- '/proto.InstanceControl/GetAndResetMetrics',
-
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
- )
- self.ResetMetrics = channel.unary_unary(
- '/proto.InstanceControl/ResetMetrics',
-
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- )
- self.GetMetrics = channel.unary_unary(
- '/proto.InstanceControl/GetMetrics',
-
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
- )
- self.HealthCheck = channel.unary_unary(
- '/proto.InstanceControl/HealthCheck',
-
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-
response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
- )
+ """Missing associated documentation comment in .proto file."""
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.GetFunctionStatus = channel.unary_unary(
+ '/proto.InstanceControl/GetFunctionStatus',
+
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+
response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
+ )
+ self.GetAndResetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/GetAndResetMetrics',
+
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+ )
+ self.ResetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/ResetMetrics',
+
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ )
+ self.GetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/GetMetrics',
+
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+ )
+ self.HealthCheck = channel.unary_unary(
+ '/proto.InstanceControl/HealthCheck',
+
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+
response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
+ )
class InstanceControlServicer(object):
- # missing associated documentation comment in .proto file
- pass
-
- def GetFunctionStatus(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def GetAndResetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def ResetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def GetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def HealthCheck(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
+ """Missing associated documentation comment in .proto file."""
+
+ def GetFunctionStatus(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def GetAndResetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def ResetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def GetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def HealthCheck(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
def add_InstanceControlServicer_to_server(servicer, server):
- rpc_method_handlers = {
- 'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
- servicer.GetFunctionStatus,
-
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-
response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
- ),
- 'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.GetAndResetMetrics,
-
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
- ),
- 'ResetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.ResetMetrics,
-
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-
response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- ),
- 'GetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.GetMetrics,
-
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
- ),
- 'HealthCheck': grpc.unary_unary_rpc_method_handler(
- servicer.HealthCheck,
-
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-
response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
- ),
- }
- generic_handler = grpc.method_handlers_generic_handler(
- 'proto.InstanceControl', rpc_method_handlers)
- server.add_generic_rpc_handlers((generic_handler,))
+ rpc_method_handlers = {
+ 'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
+ servicer.GetFunctionStatus,
+
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+
response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
+ ),
+ 'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.GetAndResetMetrics,
+
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+ ),
+ 'ResetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.ResetMetrics,
+
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+
response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ ),
+ 'GetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.GetMetrics,
+
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+ ),
+ 'HealthCheck': grpc.unary_unary_rpc_method_handler(
+ servicer.HealthCheck,
+
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+
response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'proto.InstanceControl', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
+
+
+ # This class is part of an EXPERIMENTAL API.
+class InstanceControl(object):
+ """Missing associated documentation comment in .proto file."""
+
+ @staticmethod
+ def GetFunctionStatus(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/proto.InstanceControl/GetFunctionStatus',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.FunctionStatus.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
+
+ @staticmethod
+ def GetAndResetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/proto.InstanceControl/GetAndResetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.MetricsData.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
+
+ @staticmethod
+ def ResetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/proto.InstanceControl/ResetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
+
+ @staticmethod
+ def GetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/proto.InstanceControl/GetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.MetricsData.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
+
+ @staticmethod
+ def HealthCheck(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/proto.InstanceControl/HealthCheck',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.HealthCheckResult.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index d6ff4fb..fecde7a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -309,10 +309,18 @@ class PythonInstance(object):
len(self.instance_config.function_details.sink.topic) > 0:
Log.debug("Setting up producer for topic %s" %
self.instance_config.function_details.sink.topic)
+ batch_type = pulsar.BatchingType.Default
+ if self.instance_config.function_details.sink.producerSpec.batchBuilder
!= None and \
+
len(self.instance_config.function_details.sink.producerSpec.batchBuilder) > 0:
+ batch_builder =
self.instance_config.function_details.sink.producerSpec.batchBuilder
+ if batch_builder == "KEY_BASED":
+ batch_type = pulsar.BatchingType.KeyBased
+
self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
batching_enabled=True,
+ batching_type=batch_type,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.LZ4,
# set send timeout to be infinity to prevent potential deadlock with
consumer
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 723cd46..d95d1d3 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -26,6 +26,7 @@ import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -981,6 +982,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
@Cleanup Producer<byte[]> producer =
client.newProducer(Schema.BYTES)
.topic(inputTopicName)
+ .enableBatching(true)
+ .batcherBuilder(BatcherBuilder.DEFAULT)
.create();
for (int i = 0; i < numMessages; i++) {