nobound opened a new issue #10517:
URL: https://github.com/apache/pulsar/issues/10517
**Describe the bug**
Try to apply the pulsar function to all the input topic starting with the
prefix “test-topic-“.
Based on the description of the option “--topics-pattern” from
https://pulsar.apache.org/docs/en/pulsar-admin/, it seems it is possible.
However, it does not work. Is it a functional bug or documentation issue?
**To Reproduce**
Steps to reproduce the behavior:
1. Create simple pulsar function
`
from pulsar import Function
class BasicFunction(Function):
def __init__(self):
pass
def process(self, input, context):
logger = context.get_logger()
logger.info("input {}".format(input))
return input
`
2. Create the pulsar function with pulsar
`
test_topic_out="test-topic-x-out"
test_topic_pattern="test-topic-*"
$pulsar_admin functions \
create \
--tenant public \
--namespace default \
--name BaseFunction \
--py basic_function.py \
--classname basic_function.BasicFunction \
--output $test_topic_out \
--topics-pattern $test_topic_pattern \
--log-topic persistent://public/default/log
`
3. pulsar function logs
`
[2021-05-08 18:58:34 +0000] [INFO] python_instance_main.py: Starting Python
instance with Namespace(client_auth_params=None, client_auth_plugin=None,
cluster_name='standalone', dependency_repository=None,
expected_healthcheck_interval=30, extra_dependency_repository=None,
function_details='{"tenant":"public","namespace":"default","name":"BaseFunction","className":"basic_function.BasicFunction","logTopic":"persistent://public/default/log","runtime":"PYTHON","autoAck":true,"parallelism":1,"source":{"inputSpecs":{"test-topic-*":{"isRegexPattern":true}},"cleanupSubscription":true},"sink":{"topic":"test-topic-x-out","forwardSourceMessageProperty":true},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}',
function_id='a6000222-dfd9-4d1e-9f6b-01840f05f951',
function_version='f2393128-cdfb-47a7-b550-3d5ae457cdc7',
hostname_verification_enabled=None, install_usercode_dependencies=None,
instance_id='0', logging_config_file='/pulsar/conf/functions-log
ging/logging_config.ini', logging_directory='logs//functions',
logging_file='BaseFunction', max_buffered_tuples='1024', metrics_port=37303,
port=34149, pulsar_serviceurl='pulsar://localhost:6650',
py='/pulsar/download/pulsar_functions/public/default/BaseFunction/0/basic_function.py',
secrets_provider='secretsprovider.ClearTextSecretsProvider',
secrets_provider_config=None, state_storage_serviceurl='bk://127.0.0.1:4181',
tls_allow_insecure_connection=None, tls_trust_cert_path=None, use_tls=None)
[2021-05-08 18:58:34 +0000] [INFO] log.py: Setting up producer for log topic
persistent://public/default/log
[2021-05-08 18:58:34 +0000] [INFO] server.py: Serving InstanceCommunication
on port 34149
`
4. publish message to test-topic-x
`
import pulsar
import sys
publish_topic = 'test-topic-x'
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(publish_topic)
for i in range(5):
out = ('hello-pulsar-%d' % i).encode('utf-8')
producer.send(out)
print("sent message {}".format(out))
client.close()
`
5. Nothing when reading from the persistent://public/default/log
`
import pulsar
client = pulsar.Client('pulsar://127.0.0.1:6650')
consumer = client.subscribe(topic='persistent://public/default/log',
subscription_name='log', consumer_type=pulsar.ConsumerType.Shared)
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.value(),
msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
`
**Expected behavior**
I will expect there will be message from the persistent://public/default/log
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
**Additional context**
Add any other context about the problem here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]