This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08270dd99f7 [fix][fn] Record Pulsar Function processing time properly 
for asynchronous functions (#23811)
08270dd99f7 is described below

commit 08270dd99f7c166316175c8f437b976f9dd69e44
Author: Lee hong <[email protected]>
AuthorDate: Wed Jan 29 00:03:11 2025 +0800

    [fix][fn] Record Pulsar Function processing time properly for asynchronous 
functions (#23811)
    
    Co-authored-by: Zixuan Liu <[email protected]>
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 95 ++++++++++++++++++++++
 .../functions/instance/JavaExecutionResult.java    |  6 +-
 .../pulsar/functions/instance/JavaInstance.java    | 12 +--
 .../functions/instance/JavaInstanceRunnable.java   | 12 +--
 .../instance/stats/ComponentStatsManager.java      |  3 +-
 .../instance/stats/FunctionStatsManager.java       | 11 +--
 .../functions/instance/stats/SinkStatsManager.java |  6 +-
 .../instance/stats/SourceStatsManager.java         |  6 +-
 .../src/main/resources/findbugsExclude.xml         |  5 ++
 .../instance/JavaInstanceRunnableTest.java         | 23 ++++++
 10 files changed, 142 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 74c2a93b84e..aef75e5fc7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.compaction.PublishingOrderCompactor;
 import org.apache.pulsar.functions.api.Context;
+import 
org.apache.pulsar.functions.api.examples.JavaNativeAsyncExclamationFunction;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -296,6 +297,100 @@ public class PulsarFunctionE2ETest extends 
AbstractPulsarE2ETest {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionAsyncStatTime() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "JavaNativeAsyncExclamationFunction";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespacePortion);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setInputSpecs(Collections.singletonMap(sourceTopic,
+                ConsumerConfig.builder().poolMessages(true).build()));
+        functionConfig.setAutoAck(true);
+        
functionConfig.setClassName(JavaNativeAsyncExclamationFunction.class.getName());
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+        admin.functions().createFunctionWithUrl(functionConfig,
+                
PulsarFunctionE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString());
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer =
+                
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName(subscriptionName).subscribe();
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).getSubscriptions().size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sinkTopic).getSubscriptions().size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 1);
+        
assertEquals(admin.topics().getStats(sinkTopic).getSubscriptions().size(), 1);
+
+        int cntMsg = 5;
+        for (int i = 0; i < cntMsg; i++) {
+            producer.newMessage().value("it is the " + i + "th message , it 
will spend 500ms").send();
+        }
+        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+            SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).getSubscriptions().get(subscriptionName);
+            assertEquals(subStats.getUnackedMessages(), 0);
+        });
+        int count = 0;
+        while (true) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+            count++;
+        }
+        Assert.assertEquals(count, cntMsg);
+
+        String prometheusMetrics = 
TestPulsarFunctionUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        log.info("prometheus metrics: {}", prometheusMetrics);
+        Map<String, TestPulsarFunctionUtils.Metric> statsMetrics =
+                TestPulsarFunctionUtils.parseMetrics(prometheusMetrics);
+
+        
assertEquals(statsMetrics.get("pulsar_function_process_latency_ms").value, 
500.0, 100.0);
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).getSubscriptions().size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 0);
+
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
+    }
+
     @Test(timeOut = 20000)
     public void testPulsarFunctionStats() throws Exception {
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
index 5856600196b..9ca9aa2a879 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
@@ -29,9 +29,5 @@ import lombok.Data;
 public class JavaExecutionResult {
     private Throwable userException;
     private Object result;
-
-    public void reset() {
-        setUserException(null);
-        setResult(null);
-    }
+    private final long startTime = System.nanoTime();
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5946be9fe5b..c5f82898f82 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable {
     public static class AsyncFuncRequest {
         private final Record record;
         private final CompletableFuture processResult;
+        private final JavaExecutionResult result;
     }
 
     @Getter(AccessLevel.PACKAGE)
@@ -136,7 +137,7 @@ public class JavaInstance implements AutoCloseable {
                 if (asyncPreserveInputOrderForOutputMessages) {
                     // Function is in format: Function<I, CompletableFuture<O>>
                     AsyncFuncRequest request = new AsyncFuncRequest(
-                            record, (CompletableFuture) output
+                            record, (CompletableFuture) output, executionResult
                     );
                     pendingAsyncRequests.put(request);
                 } else {
@@ -148,13 +149,12 @@ public class JavaInstance implements AutoCloseable {
                             
processAsyncResultsInInputOrder(asyncResultConsumer);
                         } else {
                             try {
-                                JavaExecutionResult execResult = new 
JavaExecutionResult();
                                 if (cause != null) {
-                                    
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
+                                    
executionResult.setUserException(FutureUtil.unwrapCompletionException(cause));
                                 } else {
-                                    execResult.setResult(res);
+                                    executionResult.setResult(res);
                                 }
-                                asyncResultConsumer.accept(record, execResult);
+                                asyncResultConsumer.accept(record, 
executionResult);
                             } finally {
                                 asyncRequestsConcurrencyLimiter.release();
                             }
@@ -187,7 +187,7 @@ public class JavaInstance implements AutoCloseable {
         while (asyncResult != null && asyncResult.getProcessResult().isDone()) 
{
             pendingAsyncRequests.remove(asyncResult);
 
-            JavaExecutionResult execResult = new JavaExecutionResult();
+            JavaExecutionResult execResult = asyncResult.getResult();
             try {
                 Object result = asyncResult.getProcessResult().get();
                 execResult.setResult(result);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4f811c14704..cfb7e9536a3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -334,8 +334,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 // set last invocation time
                 stats.setLastInvocation(System.currentTimeMillis());
 
-                // start time for process latency stat
-                stats.processTimeStart();
 
                 // process the message
                 
Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -346,9 +344,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                         asyncErrorHandler);
                 
Thread.currentThread().setContextClassLoader(instanceClassLoader);
 
-                // register end time
-                stats.processTimeEnd();
-
                 if (result != null) {
                     // process the synchronous results
                     handleResult(currentRecord, result);
@@ -448,6 +443,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             // increment total successfully processed
             stats.incrTotalProcessedSuccessfully();
         }
+        // handle endTime here
+        stats.processTimeEnd(result.getStartTime());
     }
 
     private void sendOutputMessage(Record srcRecord, Object output) throws 
Exception {
@@ -631,6 +628,11 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         return "";
     }
 
+    @VisibleForTesting
+    void setStats(ComponentStatsManager stats) {
+        this.stats = stats;
+    }
+
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         if (isInitialized) {
             statsLock.writeLock().lock();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 6da3c082f78..17321735256 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -100,9 +100,8 @@ public abstract class ComponentStatsManager implements 
AutoCloseable {
 
     public abstract void setLastInvocation(long ts);
 
-    public abstract void processTimeStart();
 
-    public abstract void processTimeEnd();
+    public abstract void processTimeEnd(long startTime);
 
     public abstract double getTotalProcessedSuccessfully();
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 8737c8a4fa9..0009fcea667 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -336,20 +336,13 @@ public class FunctionStatsManager extends 
ComponentStatsManager {
         statlastInvocationChild.set(ts);
     }
 
-    private Long processTimeStart;
 
-    @Override
-    public void processTimeStart() {
-        processTimeStart = System.nanoTime();
-    }
 
     @Override
-    public void processTimeEnd() {
-        if (processTimeStart != null) {
-            double endTimeMs = ((double) System.nanoTime() - processTimeStart) 
/ 1.0E6D;
+    public void processTimeEnd(long startTime) {
+            double endTimeMs = ((double) System.nanoTime() - startTime) / 
1.0E6D;
             statProcessLatencyChild.observe(endTimeMs);
             statProcessLatency1minChild.observe(endTimeMs);
-        }
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index c515ce6bc87..4fae7f9c292 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -279,13 +279,9 @@ public class SinkStatsManager extends 
ComponentStatsManager {
         statlastInvocationChild.set(ts);
     }
 
-    @Override
-    public void processTimeStart() {
-        //no-op
-    }
 
     @Override
-    public void processTimeEnd() {
+    public void processTimeEnd(long startTime) {
         //no-op
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 1f7e159c4dc..b68e1d610f7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -279,13 +279,9 @@ public class SourceStatsManager extends 
ComponentStatsManager {
         statlastInvocationChild.set(ts);
     }
 
-    @Override
-    public void processTimeStart() {
-        //no-op
-    }
 
     @Override
-    public void processTimeEnd() {
+    public void processTimeEnd(long startTime) {
         //no-op
     }
 
diff --git a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml 
b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
index 40e3e911123..ffe23993eb7 100644
--- a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
@@ -557,4 +557,9 @@
     <Method name="setSourceInputSpecs"/>
     <Bug pattern="EI_EXPOSE_REP2"/>
   </Match>
+  <Match>
+    <Class 
name="org.apache.pulsar.functions.instance.JavaInstance$AsyncFuncRequest"/>
+    <Method name="getResult"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
 </FindBugsFilter>
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index c83648132d4..385d78e6717 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
@@ -36,6 +38,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -50,6 +53,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -61,6 +65,7 @@ import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.awaitility.Awaitility;
 import org.jetbrains.annotations.NotNull;
+import org.mockito.ArgumentCaptor;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -173,6 +178,24 @@ public class JavaInstanceRunnableTest {
         }
     }
 
+    @Test
+    public void testFunctionAsyncTime() throws Exception {
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setAutoAck(true)
+                
.setProcessingGuarantees(org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.MANUAL)
+                .build();
+        JavaInstanceRunnable javaInstanceRunnable = 
createRunnable(functionDetails);
+        FunctionStatsManager manager = mock(FunctionStatsManager.class);
+        javaInstanceRunnable.setStats(manager);
+        JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
+        Thread.sleep(500);
+        Record record = mock(Record.class);
+        javaInstanceRunnable.handleResult(record, javaExecutionResult);
+        ArgumentCaptor<Long> timeCaptor = ArgumentCaptor.forClass(Long.class);
+        verify(manager).processTimeEnd(timeCaptor.capture());
+        Assert.assertEquals(timeCaptor.getValue(), 
javaExecutionResult.getStartTime());
+    }
+
     @Test
     public void testFunctionResultNull() throws Exception {
         JavaExecutionResult javaExecutionResult = new JavaExecutionResult();

Reply via email to