This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a3eb556  Allow the option to make producers thread local (#7764)
a3eb556 is described below

commit a3eb55653ac9cc36ffcd444275cbf2a9e5f6b298
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Mon Aug 10 07:20:53 2020 -0700

    Allow the option to make producers thread local (#7764)
    
    If a function has many threads who are doing 
context.newOutputMessage().send(), there could be a lot of contention due to 
big synchronization blocks in the ProducerImpl. By allowing functions to use 
thread local producers, this synchronization can be avoided leading to 
increased performance.
    This pr adds the configurability of using thread local producers in 
functions and sources
    
    Co-authored-by: Sanjeev Kulkarni <[email protected]>
---
 .../pulsar/common/functions/ProducerConfig.java    |  1 +
 .../pulsar/functions/instance/ContextImpl.java     | 40 +++++++++----
 .../instance/src/main/python/Function_pb2.py       | 67 ++++++++++++----------
 .../proto/src/main/proto/Function.proto            |  1 +
 .../functions/utils/FunctionConfigUtils.java       |  4 ++
 .../pulsar/functions/utils/SourceConfigUtils.java  |  4 ++
 .../functions/utils/FunctionConfigUtilsTest.java   |  2 +
 .../functions/utils/SourceConfigUtilsTest.java     |  1 +
 8 files changed, 80 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index b28370e..8d3dd66 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -35,4 +35,5 @@ import lombok.NoArgsConstructor;
 public class ProducerConfig {
     private Integer maxPendingMessages;
     private Integer maxPendingMessagesAcrossPartitions;
+    private Boolean useThreadLocalProducers;
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 763d9f7..e3f169e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -68,6 +68,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
     private PulsarClient client;
     private Map<String, Producer<?>> publishProducers;
+    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
     private ProducerBuilderImpl<?> producerBuilder;
 
     private final TopicSchema topicSchema;
@@ -102,12 +103,12 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         this.config = config;
         this.logger = logger;
         this.client = client;
-        this.publishProducers = new HashMap<>();
         this.topicSchema = new TopicSchema(client);
         this.statsManager = statsManager;
 
         this.producerBuilder = (ProducerBuilderImpl<?>) 
client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+        boolean useThreadLocalProducers = false;
         if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
             if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()
 != 0) {
                 
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
@@ -115,6 +116,12 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
             if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
 != 0) {
                 
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            useThreadLocalProducers = 
config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+        }
+        if (useThreadLocalProducers) {
+            tlPublishProducers = new ThreadLocal<>();
+        } else {
+            publishProducers = new HashMap<>();
         }
 
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
@@ -413,7 +420,17 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     }
 
     private <O> Producer<O> getProducer(String topicName, Schema<O> schema) 
throws PulsarClientException {
-        Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
+        Producer<O> producer;
+        if (tlPublishProducers != null) {
+            Map<String, Producer<?>> producerMap = tlPublishProducers.get();
+            if (producerMap == null) {
+                producerMap = new HashMap<>();
+                tlPublishProducers.set(producerMap);
+            }
+            producer = (Producer<O>) producerMap.get(topicName);
+        } else {
+            producer = (Producer<O>) publishProducers.get(topicName);
+        }
 
         if (producer == null) {
 
@@ -438,16 +455,19 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
                             this.config.getInstanceId()))
                     .create();
 
-            Producer<O> existingProducer = (Producer<O>) 
publishProducers.putIfAbsent(topicName, newProducer);
-
-            if (existingProducer != null) {
-                // The value in the map was not updated after the concurrent 
put
-                newProducer.close();
-                producer = existingProducer;
+            if (tlPublishProducers != null) {
+                tlPublishProducers.get().put(topicName, newProducer);
             } else {
-                producer = newProducer;
+                Producer<O> existingProducer = (Producer<O>) 
publishProducers.putIfAbsent(topicName, newProducer);
+
+                if (existingProducer != null) {
+                    // The value in the map was not updated after the 
concurrent put
+                    newProducer.close();
+                    producer = existingProducer;
+                } else {
+                    producer = newProducer;
+                }
             }
-
         }
         return producer;
     }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 203809d..3ee58e9 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -38,7 +38,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   syntax='proto3',
   serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
   create_key=_descriptor._internal_create_key,
-  
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+  
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -66,8 +66,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3174,
-  serialized_end=3253,
+  serialized_start=3207,
+  serialized_end=3286,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -97,8 +97,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3255,
-  serialized_end=3315,
+  serialized_start=3288,
+  serialized_end=3348,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -123,8 +123,8 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3317,
-  serialized_end=3365,
+  serialized_start=3350,
+  serialized_end=3398,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
 
@@ -149,8 +149,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3367,
-  serialized_end=3408,
+  serialized_start=3400,
+  serialized_end=3441,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -695,6 +695,13 @@ _PRODUCERSPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='useThreadLocalProducers', 
full_name='proto.ProducerSpec.useThreadLocalProducers', index=2,
+      number=3, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -708,7 +715,7 @@ _PRODUCERSPEC = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=1337,
-  serialized_end=1423,
+  serialized_end=1456,
 )
 
 
@@ -746,8 +753,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1903,
-  serialized_end=1964,
+  serialized_start=1936,
+  serialized_end=1997,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -784,8 +791,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1966,
-  serialized_end=2036,
+  serialized_start=1999,
+  serialized_end=2069,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -899,8 +906,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1426,
-  serialized_end=2036,
+  serialized_start=1459,
+  serialized_end=2069,
 )
 
 
@@ -1077,8 +1084,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2039,
-  serialized_end=2515,
+  serialized_start=2072,
+  serialized_end=2548,
 )
 
 
@@ -1116,8 +1123,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2517,
-  serialized_end=2589,
+  serialized_start=2550,
+  serialized_end=2622,
 )
 
 
@@ -1155,8 +1162,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2885,
-  serialized_end=2960,
+  serialized_start=2918,
+  serialized_end=2993,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -1221,8 +1228,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2592,
-  serialized_end=2960,
+  serialized_start=2625,
+  serialized_end=2993,
 )
 
 
@@ -1260,8 +1267,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2962,
-  serialized_end=3022,
+  serialized_start=2995,
+  serialized_end=3055,
 )
 
 
@@ -1299,8 +1306,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3024,
-  serialized_end=3105,
+  serialized_start=3057,
+  serialized_end=3138,
 )
 
 
@@ -1338,8 +1345,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3107,
-  serialized_end=3172,
+  serialized_start=3140,
+  serialized_end=3205,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 68cc936..3edcf5d 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -104,6 +104,7 @@ message ConsumerSpec {
 message ProducerSpec {
     int32 maxPendingMessages = 1;
     int32 maxPendingMessagesAcrossPartitions = 2;
+    bool useThreadLocalProducers = 3;
 }
 
 message SourceSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1b0e4fd..2a9a2bf 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -203,6 +203,9 @@ public class FunctionConfigUtils {
             if 
(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != 
null) {
                 
pbldr.setMaxPendingMessagesAcrossPartitions(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
             }
+            if 
(functionConfig.getProducerConfig().getUseThreadLocalProducers() != null) {
+                
pbldr.setUseThreadLocalProducers(functionConfig.getProducerConfig().getUseThreadLocalProducers());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -358,6 +361,7 @@ public class FunctionConfigUtils {
             if 
(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
 != 0) {
                 
producerConfig.setMaxPendingMessagesAcrossPartitions(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            
producerConfig.setUseThreadLocalProducers(functionDetails.getSink().getProducerSpec().getUseThreadLocalProducers());
             functionConfig.setProducerConfig(producerConfig);
         }
         if (!isEmpty(functionDetails.getLogTopic())) {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index d912206..0d31c9d 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -155,6 +155,9 @@ public class SourceConfigUtils {
             if 
(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != 
null) {
                 
pbldr.setMaxPendingMessagesAcrossPartitions(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
             }
+            if (sourceConfig.getProducerConfig().getUseThreadLocalProducers() 
!= null) {
+                
pbldr.setUseThreadLocalProducers(sourceConfig.getProducerConfig().getUseThreadLocalProducers());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
@@ -235,6 +238,7 @@ public class SourceConfigUtils {
             if 
(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
                 
producerConfig.setMaxPendingMessagesAcrossPartitions(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            
producerConfig.setUseThreadLocalProducers(sinkSpec.getProducerSpec().getUseThreadLocalProducers());
             sourceConfig.setProducerConfig(producerConfig);
         }
         if (functionDetails.hasResources()) {
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index e604980..4aa2dc0 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -66,6 +66,7 @@ public class FunctionConfigUtilsTest {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -105,6 +106,7 @@ public class FunctionConfigUtilsTest {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 65f29ef..520b416 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -353,6 +353,7 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         sourceConfig.setProducerConfig(producerConfig);
 
         sourceConfig.setConfigs(configs);

Reply via email to