This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4cf71659b744cb779441fff1c91044c72e199b19
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 25 04:26:32 2021 +0300

    [Issue 11075] Use the subscription name defined in function details (#11076)
    
    Fixes #11075
    
    ### Motivation
    
    See #11075 , python functions don't use the given subscription name for the 
input topic consumer.
    
    ### Modifications
    
    Use `function_details.source.subscriptionName` as the subscription name if 
it's non-blank.
    
    (cherry picked from commit 05ca2b546b9f68ce9d4a8166b92b40239c5cbe0f)
---
 pulsar-functions/instance/src/main/python/python_instance.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index fecde7a..54e4a34 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -136,9 +136,12 @@ class PythonInstance(object):
     if self.instance_config.function_details.source.subscriptionType == 
Function_pb2.SubscriptionType.Value("FAILOVER"):
       mode = pulsar._pulsar.ConsumerType.Failover
 
-    subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
-                        str(self.instance_config.function_details.namespace) + 
"/" + \
-                        str(self.instance_config.function_details.name)
+    subscription_name = 
self.instance_config.function_details.source.subscriptionName    
+
+    if not (subscription_name and subscription_name.strip()):
+      subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
+                          str(self.instance_config.function_details.namespace) 
+ "/" + \
+                          str(self.instance_config.function_details.name)
 
     properties = util.get_properties(util.getFullyQualifiedFunctionName(
                         self.instance_config.function_details.tenant,

Reply via email to