This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7cc54882edc78c4067903b796ffb5fc0e2cb6283 Author: hexiaocheng <[email protected]> AuthorDate: Wed Feb 10 08:15:24 2021 +0800 [Functions] Call the corresponding restart according to the componenttype. #9502 (#9519) Fixes #9502 Add judgment on component type,Call the corresponding restart according to the componenttype ### Motivation Restart all instances of a Pulsar Source failed. When I call the "https://pulsar.apache.org/admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart" See error "Failed to perform http post request: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error Function xxxx doesn't exist". The reason for the problem is that the corresponding functionadmin is not found according to the componenttype, which leads to the restart of sink or source using the functionadmin of function (cherry picked from commit 1f1b96dd7fe79937733c4e796217fc5eb6359e31) --- .../functions/worker/FunctionRuntimeManager.java | 27 +++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index bb07115..bb39109 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -38,6 +38,7 @@ import org.apache.pulsar.functions.auth.FunctionAuthProvider; import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; @@ -419,7 +420,16 @@ public class FunctionRuntimeManager implements AutoCloseable{ .type(MediaType.APPLICATION_JSON) .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build()); } - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); + + ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType(); + + if (ComponentType.SOURCE == componentType) { + this.functionAdmin.sources().restartSource(tenant, namespace, functionName); + } else if (ComponentType.SINK == componentType) { + this.functionAdmin.sinks().restartSink(tenant, namespace, functionName); + } else { + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); + } } } else { for (Assignment assignment : assignments) { @@ -442,8 +452,19 @@ public class FunctionRuntimeManager implements AutoCloseable{ } continue; } - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); + + ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType(); + + if (ComponentType.SOURCE == componentType) { + this.functionAdmin.sources().restartSource(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } else if (ComponentType.SINK == componentType) { + this.functionAdmin.sinks().restartSink(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } else { + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } } } }
