nobound opened a new issue #10517:
URL: https://github.com/apache/pulsar/issues/10517


   **Describe the bug**
     
   Try to apply the pulsar function to all the input topic starting with the 
prefix “test-topic-“.
   Based on the description of the option “--topics-pattern” from 
https://pulsar.apache.org/docs/en/pulsar-admin/, it seems it is possible.
   However, it does not work. Is it a functional bug or documentation issue?
   
   
   **To Reproduce**
   Steps to reproduce the behavior:
   
   1. Create simple pulsar function
   `
        from pulsar import Function
        class BasicFunction(Function):
        def __init__(self):
                pass
        def process(self, input, context):
                logger = context.get_logger()
                logger.info("input {}".format(input))
                return input
   `
    
   2. Create the pulsar function with pulsar 
   `
        test_topic_out="test-topic-x-out"
        test_topic_pattern="test-topic-*"
        
        $pulsar_admin functions \
         create \
         --tenant public \
         --namespace default \
         --name BaseFunction \
         --py basic_function.py \
         --classname basic_function.BasicFunction \
         --output $test_topic_out \
         --topics-pattern $test_topic_pattern \
         --log-topic persistent://public/default/log
   `
   
   3. pulsar function logs 
   `
   [2021-05-08 18:58:34 +0000] [INFO] python_instance_main.py: Starting Python 
instance with Namespace(client_auth_params=None, client_auth_plugin=None, 
cluster_name='standalone', dependency_repository=None, 
expected_healthcheck_interval=30, extra_dependency_repository=None, 
function_details='{"tenant":"public","namespace":"default","name":"BaseFunction","className":"basic_function.BasicFunction","logTopic":"persistent://public/default/log","runtime":"PYTHON","autoAck":true,"parallelism":1,"source":{"inputSpecs":{"test-topic-*":{"isRegexPattern":true}},"cleanupSubscription":true},"sink":{"topic":"test-topic-x-out","forwardSourceMessageProperty":true},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}',
 function_id='a6000222-dfd9-4d1e-9f6b-01840f05f951', 
function_version='f2393128-cdfb-47a7-b550-3d5ae457cdc7', 
hostname_verification_enabled=None, install_usercode_dependencies=None, 
instance_id='0', logging_config_file='/pulsar/conf/functions-log
 ging/logging_config.ini', logging_directory='logs//functions', 
logging_file='BaseFunction', max_buffered_tuples='1024', metrics_port=37303, 
port=34149, pulsar_serviceurl='pulsar://localhost:6650', 
py='/pulsar/download/pulsar_functions/public/default/BaseFunction/0/basic_function.py',
 secrets_provider='secretsprovider.ClearTextSecretsProvider', 
secrets_provider_config=None, state_storage_serviceurl='bk://127.0.0.1:4181', 
tls_allow_insecure_connection=None, tls_trust_cert_path=None, use_tls=None)
   [2021-05-08 18:58:34 +0000] [INFO] log.py: Setting up producer for log topic 
persistent://public/default/log
   [2021-05-08 18:58:34 +0000] [INFO] server.py: Serving InstanceCommunication 
on port 34149
   `
   
   4. publish message to test-topic-x 
   `
        import pulsar
        import sys
   
        publish_topic = 'test-topic-x'
        client = pulsar.Client('pulsar://localhost:6650')
        producer = client.create_producer(publish_topic)
   
        for i in range(5):
            out = ('hello-pulsar-%d' % i).encode('utf-8') 
            producer.send(out)
            print("sent message {}".format(out))
   
        client.close()
   `
   
   5. Nothing when reading from the persistent://public/default/log 
   `    
        import pulsar
        client = pulsar.Client('pulsar://127.0.0.1:6650')
        consumer = client.subscribe(topic='persistent://public/default/log', 
subscription_name='log',      consumer_type=pulsar.ConsumerType.Shared)
   
        while True:
        msg = consumer.receive()
        try:
                print("Received message '{}' id='{}'".format(msg.value(), 
msg.message_id()))
                # Acknowledge successful processing of the message
                consumer.acknowledge(msg)
                except:
                        # Message failed to be processed
                consumer.negative_acknowledge(msg)
   
        client.close()
   `
   
   **Expected behavior**
   I will expect there will be message from the persistent://public/default/log 
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to