lhotari commented on code in PR #23811:
URL: https://github.com/apache/pulsar/pull/23811#discussion_r1903143495
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -346,10 +348,10 @@ public void run() {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
- // register end time
- stats.processTimeEnd();
if (result != null) {
+ // register end time
+ stats.processTimeEnd(result.getStartTimeNanos());
Review Comment:
Remove this. This should be handled in the handleResult method instead.
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java:
##########
@@ -29,6 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
+ private double startTimeNanos;
Review Comment:
```suggestion
private final long startTimeNanos = System.nanoTime();
```
It should be a `long` field and it should be made final.
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java:
##########
@@ -117,6 +117,7 @@ public JavaExecutionResult handleMessage(Record<?> record,
Object input,
}
JavaExecutionResult executionResult = new JavaExecutionResult();
+ executionResult.setStartTimeNanos(System.nanoTime());
Review Comment:
You will also need to pass the `JavaExecutionResult` instance into a field
`javaExecutionResult` of the `AsyncFuncRequest` so that the same instance is
being used for the `asyncPreserveInputOrderForOutputMessages` case on line 139.
Replace the instantiation of `JavaExecutionResult` in
`processAsyncResultsInInputOrder` method on line 191 with this instance that is
carried in the `javaExecutionResult` of the `AsyncFuncRequest` instance.
Also replace the instance on line 152 with this same `executionResult` on
line 152 so that the starting time will be handled for the non
`asyncPreserveInputOrderForOutputMessages` case of async processing.
With these changes, the solution would have the basics in place. There would
also need to be a way to test all of this.
For testing, it might be useful to have a `java.util.function.LongSupplier`
function for getting the nanoTime. In production code, this would default to
`() -> System.nanoTime()`, but in tests, this could be used to mock the "clock"
for getting nanoTime. Adding testability would cause some additional
refactorings.
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java:
##########
@@ -117,6 +117,7 @@ public JavaExecutionResult handleMessage(Record<?> record,
Object input,
}
JavaExecutionResult executionResult = new JavaExecutionResult();
+ executionResult.setStartTimeNanos(System.nanoTime());
Review Comment:
This is obsolete since startTimeNanos is set when the instance gets
constructed.
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java:
##########
@@ -100,9 +102,7 @@ public ComponentStatsManager(FunctionCollectorRegistry
collectorRegistry,
public abstract void setLastInvocation(long ts);
- public abstract void processTimeStart();
-
- public abstract void processTimeEnd();
+ public abstract void processTimeEnd(double processTimeStart);
Review Comment:
the parameter should be a `long` instead of `double`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]