This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08270dd99f7 [fix][fn] Record Pulsar Function processing time properly
for asynchronous functions (#23811)
08270dd99f7 is described below
commit 08270dd99f7c166316175c8f437b976f9dd69e44
Author: Lee hong <[email protected]>
AuthorDate: Wed Jan 29 00:03:11 2025 +0800
[fix][fn] Record Pulsar Function processing time properly for asynchronous
functions (#23811)
Co-authored-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 95 ++++++++++++++++++++++
.../functions/instance/JavaExecutionResult.java | 6 +-
.../pulsar/functions/instance/JavaInstance.java | 12 +--
.../functions/instance/JavaInstanceRunnable.java | 12 +--
.../instance/stats/ComponentStatsManager.java | 3 +-
.../instance/stats/FunctionStatsManager.java | 11 +--
.../functions/instance/stats/SinkStatsManager.java | 6 +-
.../instance/stats/SourceStatsManager.java | 6 +-
.../src/main/resources/findbugsExclude.xml | 5 ++
.../instance/JavaInstanceRunnableTest.java | 23 ++++++
10 files changed, 142 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 74c2a93b84e..aef75e5fc7e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.PublishingOrderCompactor;
import org.apache.pulsar.functions.api.Context;
+import
org.apache.pulsar.functions.api.examples.JavaNativeAsyncExclamationFunction;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -296,6 +297,100 @@ public class PulsarFunctionE2ETest extends
AbstractPulsarE2ETest {
producer.close();
}
+ @Test(timeOut = 20000)
+ public void testPulsarFunctionAsyncStatTime() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace +
"/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "JavaNativeAsyncExclamationFunction";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespacePortion);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setInputSpecs(Collections.singletonMap(sourceTopic,
+ ConsumerConfig.builder().poolMessages(true).build()));
+ functionConfig.setAutoAck(true);
+
functionConfig.setClassName(JavaNativeAsyncExclamationFunction.class.getName());
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
+
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+ admin.functions().createFunctionWithUrl(functionConfig,
+
PulsarFunctionE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString());
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer =
+
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName(subscriptionName).subscribe();
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).getSubscriptions().size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sinkTopic).getSubscriptions().size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+ // validate pulsar sink consumer has started on the topic
+
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 1);
+
assertEquals(admin.topics().getStats(sinkTopic).getSubscriptions().size(), 1);
+
+ int cntMsg = 5;
+ for (int i = 0; i < cntMsg; i++) {
+ producer.newMessage().value("it is the " + i + "th message , it
will spend 500ms").send();
+ }
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).getSubscriptions().get(subscriptionName);
+ assertEquals(subStats.getUnackedMessages(), 0);
+ });
+ int count = 0;
+ while (true) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ count++;
+ }
+ Assert.assertEquals(count, cntMsg);
+
+ String prometheusMetrics =
TestPulsarFunctionUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ log.info("prometheus metrics: {}", prometheusMetrics);
+ Map<String, TestPulsarFunctionUtils.Metric> statsMetrics =
+ TestPulsarFunctionUtils.parseMetrics(prometheusMetrics);
+
+
assertEquals(statsMetrics.get("pulsar_function_process_latency_ms").value,
500.0, 100.0);
+ admin.functions().deleteFunction(tenant, namespacePortion,
functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).getSubscriptions().size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ // make sure subscriptions are cleanup
+
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 0);
+
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
+ }
+
@Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
index 5856600196b..9ca9aa2a879 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
@@ -29,9 +29,5 @@ import lombok.Data;
public class JavaExecutionResult {
private Throwable userException;
private Object result;
-
- public void reset() {
- setUserException(null);
- setResult(null);
- }
+ private final long startTime = System.nanoTime();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5946be9fe5b..c5f82898f82 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable {
public static class AsyncFuncRequest {
private final Record record;
private final CompletableFuture processResult;
+ private final JavaExecutionResult result;
}
@Getter(AccessLevel.PACKAGE)
@@ -136,7 +137,7 @@ public class JavaInstance implements AutoCloseable {
if (asyncPreserveInputOrderForOutputMessages) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
- record, (CompletableFuture) output
+ record, (CompletableFuture) output, executionResult
);
pendingAsyncRequests.put(request);
} else {
@@ -148,13 +149,12 @@ public class JavaInstance implements AutoCloseable {
processAsyncResultsInInputOrder(asyncResultConsumer);
} else {
try {
- JavaExecutionResult execResult = new
JavaExecutionResult();
if (cause != null) {
-
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
+
executionResult.setUserException(FutureUtil.unwrapCompletionException(cause));
} else {
- execResult.setResult(res);
+ executionResult.setResult(res);
}
- asyncResultConsumer.accept(record, execResult);
+ asyncResultConsumer.accept(record,
executionResult);
} finally {
asyncRequestsConcurrencyLimiter.release();
}
@@ -187,7 +187,7 @@ public class JavaInstance implements AutoCloseable {
while (asyncResult != null && asyncResult.getProcessResult().isDone())
{
pendingAsyncRequests.remove(asyncResult);
- JavaExecutionResult execResult = new JavaExecutionResult();
+ JavaExecutionResult execResult = asyncResult.getResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4f811c14704..cfb7e9536a3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -334,8 +334,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());
- // start time for process latency stat
- stats.processTimeStart();
// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -346,9 +344,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
- // register end time
- stats.processTimeEnd();
-
if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
@@ -448,6 +443,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
+ // handle endTime here
+ stats.processTimeEnd(result.getStartTime());
}
private void sendOutputMessage(Record srcRecord, Object output) throws
Exception {
@@ -631,6 +628,11 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
return "";
}
+ @VisibleForTesting
+ void setStats(ComponentStatsManager stats) {
+ this.stats = stats;
+ }
+
public InstanceCommunication.MetricsData getAndResetMetrics() {
if (isInitialized) {
statsLock.writeLock().lock();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 6da3c082f78..17321735256 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -100,9 +100,8 @@ public abstract class ComponentStatsManager implements
AutoCloseable {
public abstract void setLastInvocation(long ts);
- public abstract void processTimeStart();
- public abstract void processTimeEnd();
+ public abstract void processTimeEnd(long startTime);
public abstract double getTotalProcessedSuccessfully();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 8737c8a4fa9..0009fcea667 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -336,20 +336,13 @@ public class FunctionStatsManager extends
ComponentStatsManager {
statlastInvocationChild.set(ts);
}
- private Long processTimeStart;
- @Override
- public void processTimeStart() {
- processTimeStart = System.nanoTime();
- }
@Override
- public void processTimeEnd() {
- if (processTimeStart != null) {
- double endTimeMs = ((double) System.nanoTime() - processTimeStart)
/ 1.0E6D;
+ public void processTimeEnd(long startTime) {
+ double endTimeMs = ((double) System.nanoTime() - startTime) /
1.0E6D;
statProcessLatencyChild.observe(endTimeMs);
statProcessLatency1minChild.observe(endTimeMs);
- }
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index c515ce6bc87..4fae7f9c292 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -279,13 +279,9 @@ public class SinkStatsManager extends
ComponentStatsManager {
statlastInvocationChild.set(ts);
}
- @Override
- public void processTimeStart() {
- //no-op
- }
@Override
- public void processTimeEnd() {
+ public void processTimeEnd(long startTime) {
//no-op
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 1f7e159c4dc..b68e1d610f7 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -279,13 +279,9 @@ public class SourceStatsManager extends
ComponentStatsManager {
statlastInvocationChild.set(ts);
}
- @Override
- public void processTimeStart() {
- //no-op
- }
@Override
- public void processTimeEnd() {
+ public void processTimeEnd(long startTime) {
//no-op
}
diff --git a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
index 40e3e911123..ffe23993eb7 100644
--- a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
@@ -557,4 +557,9 @@
<Method name="setSourceInputSpecs"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class
name="org.apache.pulsar.functions.instance.JavaInstance$AsyncFuncRequest"/>
+ <Method name="getResult"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
</FindBugsFilter>
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index c83648132d4..385d78e6717 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
import com.fasterxml.jackson.annotation.JsonIgnore;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
@@ -36,6 +38,7 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
+
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -50,6 +53,7 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -61,6 +65,7 @@ import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
+import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -173,6 +178,24 @@ public class JavaInstanceRunnableTest {
}
}
+ @Test
+ public void testFunctionAsyncTime() throws Exception {
+ FunctionDetails functionDetails = FunctionDetails.newBuilder()
+ .setAutoAck(true)
+
.setProcessingGuarantees(org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.MANUAL)
+ .build();
+ JavaInstanceRunnable javaInstanceRunnable =
createRunnable(functionDetails);
+ FunctionStatsManager manager = mock(FunctionStatsManager.class);
+ javaInstanceRunnable.setStats(manager);
+ JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
+ Thread.sleep(500);
+ Record record = mock(Record.class);
+ javaInstanceRunnable.handleResult(record, javaExecutionResult);
+ ArgumentCaptor<Long> timeCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(manager).processTimeEnd(timeCaptor.capture());
+ Assert.assertEquals(timeCaptor.getValue(),
javaExecutionResult.getStartTime());
+ }
+
@Test
public void testFunctionResultNull() throws Exception {
JavaExecutionResult javaExecutionResult = new JavaExecutionResult();