jiangpengcheng commented on PR #23628:
URL: https://github.com/apache/pulsar/pull/23628#issuecomment-2500747511

   @lhotari  ok, I thought we need to add integration tests to the CI
   
   I just verified it with below steps:
   
   1. checkout pulsar to previous version 
(eddf395631811a731fe9c0284b44fd2f6efd2026)
   2. update the `LoggingWindowFunction` like below:
   ```
   package org.apache.pulsar.functions.api.examples.window;
   
   import java.util.Collection;
   import org.apache.pulsar.functions.api.Record;
   import org.apache.pulsar.functions.api.WindowContext;
   import org.apache.pulsar.functions.api.WindowFunction;
   import org.slf4j.Logger;
   
   /**
    * A function that demonstrates how to redirect logging to a topic.
    * In this particular example, for every input string, the function
    * does some logging. If --logTopic topic is specified, these log statements
    * end up in that specified pulsar topic.
    */
   public class LoggingWindowFunction implements WindowFunction<String, Void> {
   
       @Override
       public Void process(Collection<Record<String>> inputs, WindowContext 
context) throws Exception {
           Logger log = context.getLogger();
           log.info("tenant: {}, namespace: {}, instanceId: {}, replicas: {}", 
context.getTenant(), context.getNamespace(),
                   context.getInstanceId(), context.getNumInstances());
           for (Record<String> record : inputs) {
               log.info(record + "-window-log");
           }
           return null;
       }
   }
   ```
   3. build the `pulsar-functions/java-examples` project, it generated a `--jar 
pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar`
   4. checkout pulsar to master (7909d2dfdb4aad8053c133ce6a00d5dddf0b9db8)
   5. start the pulsar locally:
   ```
   ./bin/pulsar standalone
   ```
   6. create the window function
   ```
   ./bin/pulsar-admin functions create --tenant public --namespace default 
--name window-function --className 
org.apache.pulsar.functions.api.examples.window.LoggingWindowFunction --inputs 
persistent://public/default/test-window-input --output 
persistent://public/default/test-window-output --log-topic 
persistent://public/default/test-window-logs --jar 
pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar --cpu 
0.1 --window-length-count 10 --sliding-interval-count 5
   ```
   7. produce 5 messages to the input topic:
   ```
   ./bin/pulsar-client produce -m "test-message" --value-schema string -n 5 
persistent://public/default/test-window-input
   ```
   8. consume from the log topic:
   ```
   ./bin/pulsar-client consume -n 0 -s mysub --subscription-position Earliest 
persistent://public/default/test-window-logs
   ```
   9. the function works correctly and I got expected messages from the log 
topic:
   
   ```
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], content:Window Config: WindowConfig(windowLengthCount=10, 
windowLengthDurationMs=null, slidingIntervalCount=5, 
slidingIntervalDurationMs=null, lateDataTopic=null, maxLagMs=null, 
watermarkEmitIntervalMs=null, timestampExtractorClassName=null, 
actualWindowFunctionClassName=org.apache.pulsar.functions.api.examples.window.LoggingWindowFunction,
 processingGuarantees=ATLEAST_ONCE)
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], content:tenant: public, namespace: default, instanceId: 0, 
replicas: 1
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], 
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@32afdbf6-window-log
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], 
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@467cda10-window-log
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], 
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@3e25cfb6-window-log
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], 
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@3997d10c-window-log
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], 
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@752bcda3-window-log
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], content:[persistent://public/default/test-window-input] 
[public/default/window-function] [ef1db] Prefetched messages: 0 --- Consume 
throughput received: 0.08 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.08 ack/s 
--- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
   ----- got message -----
   key:[null], properties:[fqn=public/default/window-function, instance=0, 
loglevel=INFO], content:[persistent://public/default/test-window-logs] 
[standalone-23-1] --- Publish throughput: 0.12 msg/s --- 0.00 Mbit/s --- 
Latency: med: 117.000 ms - 95pct: 117.000 ms - 99pct: 117.000 ms - 99.9pct: 
117.000 ms - max: 117.000 ms --- BatchSize: med: 6.000 - 95pct: 6.000 - 99pct: 
6.000 - 99.9pct: 6.000 - max: 6.000 --- MsgSize: med: 437.000 bytes - 95pct: 
437.000 bytes - 99pct: 437.000 bytes - 99.9pct: 437.000 bytes - max: 437.000 
bytes --- Ack received rate: 0.12 ack/s --- Failed messages: 0 --- Pending 
messages: 1
   2024-11-26T12:54:54,980+0000 [pulsar-timer-16-1] INFO  
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - 
[persistent://public/default/test-window-logs] [mysub] [9415c] Prefetched 
messages: 0 --- Consume throughput received: 0.15 msgs/s --- 0.00 Mbit/s --- 
Ack sent rate: 0.15 ack/s --- Failed messages: 0 --- batch messages: 0 
---Failed acks: 0
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to