This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 22d7a6c Added ability to specify producer config for functions and
sources (#7721)
22d7a6c is described below
commit 22d7a6cbcc36c79be64d3f39f707139ab43889c6
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Tue Aug 4 18:34:58 2020 -0700
Added ability to specify producer config for functions and sources (#7721)
* Added ability to specify producer config for functions and sources
* Fixed test
* Fix test
* Add generated function proto
* Add header
* Address comments
Co-authored-by: Sanjeev Kulkarni <[email protected]>
---
.../pulsar/common/functions/FunctionConfig.java | 3 +
.../pulsar/common/functions/ProducerConfig.java | 35 +-
.../org/apache/pulsar/common/io/SourceConfig.java | 3 +
.../pulsar/functions/instance/ContextImpl.java | 8 +
.../functions/instance/JavaInstanceRunnable.java | 4 +
.../apache/pulsar/functions/sink/PulsarSink.java | 8 +
.../pulsar/functions/sink/PulsarSinkConfig.java | 2 +
.../instance/src/main/python/Function_pb2.py | 610 ++++++++++++++++-----
.../proto/src/main/proto/Function.proto | 6 +
.../functions/utils/FunctionConfigUtils.java | 25 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 22 +
.../functions/utils/FunctionConfigUtilsTest.java | 13 +-
.../functions/utils/SourceConfigUtilsTest.java | 6 +
13 files changed, 589 insertions(+), 156 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 8c680e8..fa925c4 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -77,6 +77,9 @@ public class FunctionConfig {
private String output;
+ // Any configuration that need to be applied for producers
+ private ProducerConfig producerConfig;
+
/**
* Represents either a builtin schema type (eg: 'avro', 'json', ect) or
the class name for a Schema
* implementation.
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
similarity index 59%
copy from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 4e47812..b28370e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -16,24 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.sink;
+package org.apache.pulsar.common.functions;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.common.functions.FunctionConfig;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
-import java.util.Map;
-
-@Getter
-@Setter
-@ToString
-public class PulsarSinkConfig {
- private FunctionConfig.ProcessingGuarantees processingGuarantees;
- private String topic;
- private String serdeClassName;
- private String schemaType;
- private Map<String, String> schemaProperties;
- private String typeClassName;
- private boolean forwardSourceMessageProperty;
+/**
+ * Configuration of the producer inside the function.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ProducerConfig {
+ private Integer maxPendingMessages;
+ private Integer maxPendingMessagesAcrossPartitions;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index b3a5634..31a8634 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -24,6 +24,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
/**
@@ -41,6 +42,8 @@ public class SourceConfig {
private String topicName;
+ private ProducerConfig producerConfig;
+
private String serdeClassName;
private String schemaType;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index c61aee0..763d9f7 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -108,6 +108,14 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
this.producerBuilder = (ProducerBuilderImpl<?>)
client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+ if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
+ if
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()
!= 0) {
+
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+ }
+ if
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
!= 0) {
+
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ }
+ }
if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 29cb71f..45a8174 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -812,6 +812,10 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());
+ if
(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
+
pulsarSinkConfig.setProducerSpec(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec());
+ }
+
object = new PulsarSink(this.client, pulsarSinkConfig,
this.properties, this.stats, this.functionClassLoader);
}
} else {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 9aa8bc1..00d12c1 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -103,6 +103,14 @@ public class PulsarSink<T> implements Sink<T> {
if (producerName != null) {
builder.producerName(producerName);
}
+ if (pulsarSinkConfig.getProducerSpec() != null) {
+ if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessages()
!= 0) {
+
builder.maxPendingMessages(pulsarSinkConfig.getProducerSpec().getMaxPendingMessages());
+ }
+ if
(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions() !=
0) {
+
builder.maxPendingMessagesAcrossPartitions(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ }
+ }
return builder.properties(properties).create();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
index 4e47812..a4ba7e3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.proto.Function;
import java.util.Map;
@@ -36,4 +37,5 @@ public class PulsarSinkConfig {
private Map<String, String> schemaProperties;
private String typeClassName;
private boolean forwardSourceMessageProperty;
+ private Function.ProducerSpec producerSpec;
}
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 108ed79..203809d 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -17,7 +17,6 @@
# under the License.
#
-# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: Function.proto
@@ -38,7 +37,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='proto',
syntax='proto3',
serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
-
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02
\x01(\t\"\xa3\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+ create_key=_descriptor._internal_create_key,
+
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
\x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02
\x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -46,24 +46,28 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
full_name='proto.ProcessingGuarantees',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='ATLEAST_ONCE', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='ATMOST_ONCE', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='EFFECTIVELY_ONCE', index=2, number=2,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=2429,
- serialized_end=2508,
+ serialized_start=3174,
+ serialized_end=3253,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -73,20 +77,28 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
full_name='proto.SubscriptionType',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='SHARED', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='FAILOVER', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='KEY_SHARED', index=2, number=2,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=2510,
- serialized_end=2554,
+ serialized_start=3255,
+ serialized_end=3315,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -96,20 +108,23 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
full_name='proto.SubscriptionPosition',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='LATEST', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='EARLIEST', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=2556,
- serialized_end=2604,
+ serialized_start=3317,
+ serialized_end=3365,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
@@ -119,20 +134,23 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
full_name='proto.FunctionState',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='RUNNING', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='STOPPED', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=2606,
- serialized_end=2647,
+ serialized_start=3367,
+ serialized_end=3408,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
@@ -142,6 +160,7 @@ ATMOST_ONCE = 1
EFFECTIVELY_ONCE = 2
SHARED = 0
FAILOVER = 1
+KEY_SHARED = 2
LATEST = 0
EARLIEST = 1
RUNNING = 0
@@ -153,24 +172,28 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
full_name='proto.FunctionDetails.Runtime',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='JAVA', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='PYTHON', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='GO', index=2, number=3,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=717,
- serialized_end=756,
+ serialized_start=785,
+ serialized_end=824,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
@@ -179,28 +202,33 @@ _FUNCTIONDETAILS_COMPONENTTYPE =
_descriptor.EnumDescriptor(
full_name='proto.FunctionDetails.ComponentType',
filename=None,
file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='UNKNOWN', index=0, number=0,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='FUNCTION', index=1, number=1,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='SOURCE', index=2, number=2,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='SINK', index=3, number=3,
serialized_options=None,
- type=None),
+ type=None,
+ create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
- serialized_start=758,
- serialized_end=822,
+ serialized_start=826,
+ serialized_end=890,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
@@ -211,6 +239,7 @@ _RESOURCES = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='cpu', full_name='proto.Resources.cpu', index=0,
@@ -218,21 +247,21 @@ _RESOURCES = _descriptor.Descriptor(
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='ram', full_name='proto.Resources.ram', index=1,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='disk', full_name='proto.Resources.disk', index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -256,6 +285,7 @@ _RETRYDETAILS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='maxMessageRetries',
full_name='proto.RetryDetails.maxMessageRetries', index=0,
@@ -263,14 +293,14 @@ _RETRYDETAILS = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic',
index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -294,6 +324,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='tenant', full_name='proto.FunctionDetails.tenant', index=0,
@@ -301,133 +332,154 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='namespace', full_name='proto.FunctionDetails.namespace', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='name', full_name='proto.FunctionDetails.name', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='className', full_name='proto.FunctionDetails.className', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='processingGuarantees',
full_name='proto.FunctionDetails.processingGuarantees', index=5,
number=6, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
number=16, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='runtime', full_name='proto.FunctionDetails.runtime', index=8,
number=8, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9,
number=9, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='parallelism', full_name='proto.FunctionDetails.parallelism',
index=10,
number=10, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='source', full_name='proto.FunctionDetails.source', index=11,
number=11, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='sink', full_name='proto.FunctionDetails.sink', index=12,
number=12, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='resources', full_name='proto.FunctionDetails.resources', index=13,
number=13, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='packageUrl', full_name='proto.FunctionDetails.packageUrl',
index=14,
number=14, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='retryDetails', full_name='proto.FunctionDetails.retryDetails',
index=15,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags',
index=16,
number=17, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='componentType', full_name='proto.FunctionDetails.componentType',
index=17,
number=18, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='customRuntimeOptions',
full_name='proto.FunctionDetails.customRuntimeOptions', index=18,
number=19, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='builtin', full_name='proto.FunctionDetails.builtin', index=19,
+ number=20, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='retainOrdering', full_name='proto.FunctionDetails.retainOrdering',
index=20,
+ number=21, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='retainKeyOrdering',
full_name='proto.FunctionDetails.retainKeyOrdering', index=21,
+ number=22, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -443,7 +495,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=147,
- serialized_end=822,
+ serialized_end=890,
)
@@ -453,6 +505,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='value', full_name='proto.ConsumerSpec.ReceiverQueueSize.value',
index=0,
@@ -460,7 +513,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -473,8 +526,84 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=975,
- serialized_end=1009,
+ serialized_start=1185,
+ serialized_end=1219,
+)
+
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
+ name='SchemaPropertiesEntry',
+ full_name='proto.ConsumerSpec.SchemaPropertiesEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='proto.ConsumerSpec.SchemaPropertiesEntry.key',
index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='value',
full_name='proto.ConsumerSpec.SchemaPropertiesEntry.value', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=b'8\001',
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1221,
+ serialized_end=1276,
+)
+
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
+ name='ConsumerPropertiesEntry',
+ full_name='proto.ConsumerSpec.ConsumerPropertiesEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.key',
index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='value',
full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.value', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=b'8\001',
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1278,
+ serialized_end=1335,
)
_CONSUMERSPEC = _descriptor.Descriptor(
@@ -483,6 +612,7 @@ _CONSUMERSPEC = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='schemaType', full_name='proto.ConsumerSpec.schemaType', index=0,
@@ -490,32 +620,85 @@ _CONSUMERSPEC = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName',
index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='isRegexPattern', full_name='proto.ConsumerSpec.isRegexPattern',
index=2,
number=3, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='receiverQueueSize',
full_name='proto.ConsumerSpec.receiverQueueSize', index=3,
number=4, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='schemaProperties',
full_name='proto.ConsumerSpec.schemaProperties', index=4,
+ number=5, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='consumerProperties',
full_name='proto.ConsumerSpec.consumerProperties', index=5,
+ number=6, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE,
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY, _CONSUMERSPEC_CONSUMERPROPERTIESENTRY, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=893,
+ serialized_end=1335,
+)
+
+
+_PRODUCERSPEC = _descriptor.Descriptor(
+ name='ProducerSpec',
+ full_name='proto.ProducerSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='maxPendingMessages',
full_name='proto.ProducerSpec.maxPendingMessages', index=0,
+ number=1, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='maxPendingMessagesAcrossPartitions',
full_name='proto.ProducerSpec.maxPendingMessagesAcrossPartitions', index=1,
+ number=2, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
- nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, ],
+ nested_types=[],
enum_types=[
],
serialized_options=None,
@@ -524,8 +707,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=825,
- serialized_end=1009,
+ serialized_start=1337,
+ serialized_end=1423,
)
@@ -535,6 +718,7 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY =
_descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key',
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0,
@@ -542,14 +726,14 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY =
_descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value',
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -562,8 +746,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1451,
- serialized_end=1512,
+ serialized_start=1903,
+ serialized_end=1964,
)
_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -572,6 +756,7 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='proto.SourceSpec.InputSpecsEntry.key', index=0,
@@ -579,14 +764,14 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='proto.SourceSpec.InputSpecsEntry.value',
index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -599,8 +784,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1514,
- serialized_end=1584,
+ serialized_start=1966,
+ serialized_end=2036,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -609,6 +794,7 @@ _SOURCESPEC = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='className', full_name='proto.SourceSpec.className', index=0,
@@ -616,84 +802,91 @@ _SOURCESPEC = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='configs', full_name='proto.SourceSpec.configs', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='typeClassName', full_name='proto.SourceSpec.typeClassName',
index=2,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='subscriptionType', full_name='proto.SourceSpec.subscriptionType',
index=3,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='topicsToSerDeClassName',
full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=b'\030\001', file=DESCRIPTOR),
+ serialized_options=b'\030\001', file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5,
number=10, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=6,
number=6, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='topicsPattern', full_name='proto.SourceSpec.topicsPattern',
index=7,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=b'\030\001', file=DESCRIPTOR),
+ serialized_options=b'\030\001', file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='builtin', full_name='proto.SourceSpec.builtin', index=8,
number=8, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='subscriptionName', full_name='proto.SourceSpec.subscriptionName',
index=9,
number=9, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='cleanupSubscription',
full_name='proto.SourceSpec.cleanupSubscription', index=10,
number=11, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='subscriptionPosition',
full_name='proto.SourceSpec.subscriptionPosition', index=11,
number=12, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='negativeAckRedeliveryDelayMs',
full_name='proto.SourceSpec.negativeAckRedeliveryDelayMs', index=12,
+ number=13, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -706,17 +899,94 @@ _SOURCESPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1012,
- serialized_end=1584,
+ serialized_start=1426,
+ serialized_end=2036,
)
+_SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
+ name='SchemaPropertiesEntry',
+ full_name='proto.SinkSpec.SchemaPropertiesEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='proto.SinkSpec.SchemaPropertiesEntry.key',
index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='proto.SinkSpec.SchemaPropertiesEntry.value',
index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=b'8\001',
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1221,
+ serialized_end=1276,
+)
+
+_SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
+ name='ConsumerPropertiesEntry',
+ full_name='proto.SinkSpec.ConsumerPropertiesEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='proto.SinkSpec.ConsumerPropertiesEntry.key',
index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='proto.SinkSpec.ConsumerPropertiesEntry.value',
index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=b'8\001',
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1278,
+ serialized_end=1335,
+)
+
_SINKSPEC = _descriptor.Descriptor(
name='SinkSpec',
full_name='proto.SinkSpec',
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='className', full_name='proto.SinkSpec.className', index=0,
@@ -724,60 +994,81 @@ _SINKSPEC = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='configs', full_name='proto.SinkSpec.configs', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='topic', full_name='proto.SinkSpec.topic', index=3,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='producerSpec', full_name='proto.SinkSpec.producerSpec', index=4,
+ number=11, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='serDeClassName', full_name='proto.SinkSpec.serDeClassName',
index=4,
+ name='serDeClassName', full_name='proto.SinkSpec.serDeClassName',
index=5,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='builtin', full_name='proto.SinkSpec.builtin', index=5,
+ name='builtin', full_name='proto.SinkSpec.builtin', index=6,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='schemaType', full_name='proto.SinkSpec.schemaType', index=6,
+ name='schemaType', full_name='proto.SinkSpec.schemaType', index=7,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='forwardSourceMessageProperty',
full_name='proto.SinkSpec.forwardSourceMessageProperty', index=7,
+ name='forwardSourceMessageProperty',
full_name='proto.SinkSpec.forwardSourceMessageProperty', index=8,
number=8, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='schemaProperties', full_name='proto.SinkSpec.schemaProperties',
index=9,
+ number=9, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='consumerProperties',
full_name='proto.SinkSpec.consumerProperties', index=10,
+ number=10, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
- nested_types=[],
+ nested_types=[_SINKSPEC_SCHEMAPROPERTIESENTRY,
_SINKSPEC_CONSUMERPROPERTIESENTRY, ],
enum_types=[
],
serialized_options=None,
@@ -786,8 +1077,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1587,
- serialized_end=1770,
+ serialized_start=2039,
+ serialized_end=2515,
)
@@ -797,6 +1088,7 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='packagePath',
full_name='proto.PackageLocationMetaData.packagePath', index=0,
@@ -804,14 +1096,14 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='originalFileName',
full_name='proto.PackageLocationMetaData.originalFileName', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -824,8 +1116,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1772,
- serialized_end=1844,
+ serialized_start=2517,
+ serialized_end=2589,
)
@@ -835,6 +1127,7 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY =
_descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='proto.FunctionMetaData.InstanceStatesEntry.key',
index=0,
@@ -842,14 +1135,14 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY =
_descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value',
full_name='proto.FunctionMetaData.InstanceStatesEntry.value', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -862,8 +1155,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2140,
- serialized_end=2215,
+ serialized_start=2885,
+ serialized_end=2960,
)
_FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -872,6 +1165,7 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='functionDetails',
full_name='proto.FunctionMetaData.functionDetails', index=0,
@@ -879,42 +1173,42 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='packageLocation',
full_name='proto.FunctionMetaData.packageLocation', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='version', full_name='proto.FunctionMetaData.version', index=2,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='createTime', full_name='proto.FunctionMetaData.createTime',
index=3,
number=4, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='instanceStates',
full_name='proto.FunctionMetaData.instanceStates', index=4,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='functionAuthSpec',
full_name='proto.FunctionMetaData.functionAuthSpec', index=5,
number=6, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -927,8 +1221,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1847,
- serialized_end=2215,
+ serialized_start=2592,
+ serialized_end=2960,
)
@@ -938,6 +1232,7 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0,
@@ -945,14 +1240,14 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
has_default_value=False, default_value=b"",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='provider', full_name='proto.FunctionAuthenticationSpec.provider',
index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -965,8 +1260,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2217,
- serialized_end=2277,
+ serialized_start=2962,
+ serialized_end=3022,
)
@@ -976,6 +1271,7 @@ _INSTANCE = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='functionMetaData', full_name='proto.Instance.functionMetaData',
index=0,
@@ -983,14 +1279,14 @@ _INSTANCE = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='instanceId', full_name='proto.Instance.instanceId', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -1003,8 +1299,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2279,
- serialized_end=2360,
+ serialized_start=3024,
+ serialized_end=3105,
)
@@ -1014,6 +1310,7 @@ _ASSIGNMENT = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='instance', full_name='proto.Assignment.instance', index=0,
@@ -1021,14 +1318,14 @@ _ASSIGNMENT = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='workerId', full_name='proto.Assignment.workerId', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR,
create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -1041,8 +1338,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2362,
- serialized_end=2427,
+ serialized_start=3107,
+ serialized_end=3172,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type =
_PROCESSINGGUARANTEES
@@ -1055,7 +1352,11 @@
_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_CO
_FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
_FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
_CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY.containing_type = _CONSUMERSPEC
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type = _CONSUMERSPEC
_CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type =
_CONSUMERSPEC_RECEIVERQUEUESIZE
+_CONSUMERSPEC.fields_by_name['schemaProperties'].message_type =
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY
+_CONSUMERSPEC.fields_by_name['consumerProperties'].message_type =
_CONSUMERSPEC_CONSUMERPROPERTIESENTRY
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
_SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type =
_CONSUMERSPEC
_SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
@@ -1063,6 +1364,11 @@ _SOURCESPEC.fields_by_name['subscriptionType'].enum_type
= _SUBSCRIPTIONTYPE
_SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type =
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY
_SOURCESPEC.fields_by_name['inputSpecs'].message_type =
_SOURCESPEC_INPUTSPECSENTRY
_SOURCESPEC.fields_by_name['subscriptionPosition'].enum_type =
_SUBSCRIPTIONPOSITION
+_SINKSPEC_SCHEMAPROPERTIESENTRY.containing_type = _SINKSPEC
+_SINKSPEC_CONSUMERPROPERTIESENTRY.containing_type = _SINKSPEC
+_SINKSPEC.fields_by_name['producerSpec'].message_type = _PRODUCERSPEC
+_SINKSPEC.fields_by_name['schemaProperties'].message_type =
_SINKSPEC_SCHEMAPROPERTIESENTRY
+_SINKSPEC.fields_by_name['consumerProperties'].message_type =
_SINKSPEC_CONSUMERPROPERTIESENTRY
_FUNCTIONMETADATA_INSTANCESTATESENTRY.fields_by_name['value'].enum_type =
_FUNCTIONSTATE
_FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA
_FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type =
_FUNCTIONDETAILS
@@ -1075,6 +1381,7 @@ DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS
DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
+DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC
DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] =
_PACKAGELOCATIONMETADATA
@@ -1117,12 +1424,35 @@ ConsumerSpec =
_reflection.GeneratedProtocolMessageType('ConsumerSpec', (_messag
#
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.ReceiverQueueSize)
})
,
+
+ 'SchemaPropertiesEntry' :
_reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry',
(_message.Message,), {
+ 'DESCRIPTOR' : _CONSUMERSPEC_SCHEMAPROPERTIESENTRY,
+ '__module__' : 'Function_pb2'
+ #
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.SchemaPropertiesEntry)
+ })
+ ,
+
+ 'ConsumerPropertiesEntry' :
_reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry',
(_message.Message,), {
+ 'DESCRIPTOR' : _CONSUMERSPEC_CONSUMERPROPERTIESENTRY,
+ '__module__' : 'Function_pb2'
+ #
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.ConsumerPropertiesEntry)
+ })
+ ,
'DESCRIPTOR' : _CONSUMERSPEC,
'__module__' : 'Function_pb2'
# @@protoc_insertion_point(class_scope:proto.ConsumerSpec)
})
_sym_db.RegisterMessage(ConsumerSpec)
_sym_db.RegisterMessage(ConsumerSpec.ReceiverQueueSize)
+_sym_db.RegisterMessage(ConsumerSpec.SchemaPropertiesEntry)
+_sym_db.RegisterMessage(ConsumerSpec.ConsumerPropertiesEntry)
+
+ProducerSpec = _reflection.GeneratedProtocolMessageType('ProducerSpec',
(_message.Message,), {
+ 'DESCRIPTOR' : _PRODUCERSPEC,
+ '__module__' : 'Function_pb2'
+ # @@protoc_insertion_point(class_scope:proto.ProducerSpec)
+ })
+_sym_db.RegisterMessage(ProducerSpec)
SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec',
(_message.Message,), {
@@ -1148,11 +1478,27 @@
_sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry)
_sym_db.RegisterMessage(SourceSpec.InputSpecsEntry)
SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec',
(_message.Message,), {
+
+ 'SchemaPropertiesEntry' :
_reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry',
(_message.Message,), {
+ 'DESCRIPTOR' : _SINKSPEC_SCHEMAPROPERTIESENTRY,
+ '__module__' : 'Function_pb2'
+ #
@@protoc_insertion_point(class_scope:proto.SinkSpec.SchemaPropertiesEntry)
+ })
+ ,
+
+ 'ConsumerPropertiesEntry' :
_reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry',
(_message.Message,), {
+ 'DESCRIPTOR' : _SINKSPEC_CONSUMERPROPERTIESENTRY,
+ '__module__' : 'Function_pb2'
+ #
@@protoc_insertion_point(class_scope:proto.SinkSpec.ConsumerPropertiesEntry)
+ })
+ ,
'DESCRIPTOR' : _SINKSPEC,
'__module__' : 'Function_pb2'
# @@protoc_insertion_point(class_scope:proto.SinkSpec)
})
_sym_db.RegisterMessage(SinkSpec)
+_sym_db.RegisterMessage(SinkSpec.SchemaPropertiesEntry)
+_sym_db.RegisterMessage(SinkSpec.ConsumerPropertiesEntry)
PackageLocationMetaData =
_reflection.GeneratedProtocolMessageType('PackageLocationMetaData',
(_message.Message,), {
'DESCRIPTOR' : _PACKAGELOCATIONMETADATA,
@@ -1199,9 +1545,13 @@ _sym_db.RegisterMessage(Assignment)
DESCRIPTOR._options = None
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY._options = None
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY._options = None
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = None
_SOURCESPEC_INPUTSPECSENTRY._options = None
_SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = None
_SOURCESPEC.fields_by_name['topicsPattern']._options = None
+_SINKSPEC_SCHEMAPROPERTIESENTRY._options = None
+_SINKSPEC_CONSUMERPROPERTIESENTRY._options = None
_FUNCTIONMETADATA_INSTANCESTATESENTRY._options = None
# @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index f39fdfd..68cc936 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -101,6 +101,11 @@ message ConsumerSpec {
map<string, string> consumerProperties = 6;
}
+message ProducerSpec {
+ int32 maxPendingMessages = 1;
+ int32 maxPendingMessagesAcrossPartitions = 2;
+}
+
message SourceSpec {
string className = 1;
// map in json format
@@ -138,6 +143,7 @@ message SinkSpec {
// configs used only when functions output to sink
string topic = 3;
+ ProducerSpec producerSpec = 11;
string serDeClassName = 4;
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 342ba27..1b0e4fd 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -25,10 +25,7 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.*;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
@@ -198,6 +195,16 @@ public class FunctionConfigUtils {
if (typeArgs != null) {
sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
}
+ if (functionConfig.getProducerConfig() != null) {
+ Function.ProducerSpec.Builder pbldr =
Function.ProducerSpec.newBuilder();
+ if (functionConfig.getProducerConfig().getMaxPendingMessages() !=
null) {
+
pbldr.setMaxPendingMessages(functionConfig.getProducerConfig().getMaxPendingMessages());
+ }
+ if
(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() !=
null) {
+
pbldr.setMaxPendingMessagesAcrossPartitions(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
+ }
+ sinkSpecBuilder.setProducerSpec(pbldr.build());
+ }
functionDetailsBuilder.setSink(sinkSpecBuilder);
if (functionConfig.getTenant() != null) {
@@ -343,6 +350,16 @@ public class FunctionConfigUtils {
if (!isEmpty(functionDetails.getSink().getSchemaType())) {
functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
}
+ if (functionDetails.getSink().getProducerSpec() != null) {
+ ProducerConfig producerConfig = new ProducerConfig();
+ if
(functionDetails.getSink().getProducerSpec().getMaxPendingMessages() != 0) {
+
producerConfig.setMaxPendingMessages(functionDetails.getSink().getProducerSpec().getMaxPendingMessages());
+ }
+ if
(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
!= 0) {
+
producerConfig.setMaxPendingMessagesAcrossPartitions(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ }
+ functionConfig.setProducerConfig(producerConfig);
+ }
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a246e53..d912206 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -28,6 +28,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -146,6 +147,17 @@ public class SourceConfigUtils {
sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
}
+ if (sourceConfig.getProducerConfig() != null) {
+ Function.ProducerSpec.Builder pbldr =
Function.ProducerSpec.newBuilder();
+ if (sourceConfig.getProducerConfig().getMaxPendingMessages() !=
null) {
+
pbldr.setMaxPendingMessages(sourceConfig.getProducerConfig().getMaxPendingMessages());
+ }
+ if
(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() !=
null) {
+
pbldr.setMaxPendingMessagesAcrossPartitions(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
+ }
+ sinkSpecBuilder.setProducerSpec(pbldr.build());
+ }
+
functionDetailsBuilder.setSink(sinkSpecBuilder);
// use default resources if resources not set
@@ -215,6 +227,16 @@ public class SourceConfigUtils {
if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
}
+ if (sinkSpec.getProducerSpec() != null) {
+ ProducerConfig producerConfig = new ProducerConfig();
+ if (sinkSpec.getProducerSpec().getMaxPendingMessages() != 0) {
+
producerConfig.setMaxPendingMessages(sinkSpec.getProducerSpec().getMaxPendingMessages());
+ }
+ if
(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
+
producerConfig.setMaxPendingMessagesAcrossPartitions(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ }
+ sourceConfig.setProducerConfig(producerConfig);
+ }
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 9d8c0bc..e604980 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.*;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
@@ -66,6 +63,10 @@ public class FunctionConfigUtilsTest {
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
functionConfig.setRuntimeFlags("-DKerberos");
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setMaxPendingMessages(100);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+ functionConfig.setProducerConfig(producerConfig);
Function.FunctionDetails functionDetails =
FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig =
FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -101,6 +102,10 @@ public class FunctionConfigUtilsTest {
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
functionConfig.setWindowConfig(new
WindowConfig().setWindowLengthCount(10));
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setMaxPendingMessages(100);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+ functionConfig.setProducerConfig(producerConfig);
Function.FunctionDetails functionDetails =
FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig =
FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index d517026..65f29ef 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -349,6 +350,11 @@ public class SourceConfigUtilsTest extends
PowerMockTestCase {
configs.put("bootstrapServers", "server-1,server-2");
configs.put("consumerConfigProperties", consumerConfigs);
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setMaxPendingMessages(100);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+ sourceConfig.setProducerConfig(producerConfig);
+
sourceConfig.setConfigs(configs);
return sourceConfig;
}