srkukarni commented on a change in pull request #3088: Improve and correct
status for function, sources, sinks
URL: https://github.com/apache/pulsar/pull/3088#discussion_r237945124
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
##########
@@ -604,42 +609,70 @@ public FunctionStats getFunctionStats(String tenant,
String namespace, String fu
}
if (assignment == null) {
- InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
- = InstanceCommunication.FunctionStatus.newBuilder();
- functionStatusBuilder.setRunning(false);
- functionStatusBuilder.setFailureException("Function has not been
scheduled");
- return functionStatusBuilder.build();
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ functionInstanceStatusData.setError("Function has not been
scheduled");
+ return functionInstanceStatusData;
}
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
- InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
if (runtimeSpawner != null) {
try {
- InstanceCommunication.FunctionStatus.Builder
functionStatusBuilder = InstanceCommunication.FunctionStatus
-
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
- functionStatusBuilder.setWorkerId(assignedWorkerId);
- functionStatus = functionStatusBuilder.build();
+ InstanceCommunication.FunctionStatus status =
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
Review comment:
Do we want to consolidate this logic of converting proto into user responses
into a class? Like we have for FunctionConfigUtils, etc?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services