This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1f1b96d [Functions] Call the corresponding restart according to the
componenttype. #9502 (#9519)
1f1b96d is described below
commit 1f1b96dd7fe79937733c4e796217fc5eb6359e31
Author: hexiaocheng <[email protected]>
AuthorDate: Wed Feb 10 08:15:24 2021 +0800
[Functions] Call the corresponding restart according to the componenttype.
#9502 (#9519)
Fixes #9502
Add judgment on component type,Call the corresponding restart according to
the componenttype
### Motivation
Restart all instances of a Pulsar Source failed.
When I call the
"https://pulsar.apache.org/admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart"
See error "Failed to perform http post request:
javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
Function xxxx doesn't exist".
The reason for the problem is that the corresponding functionadmin is not
found according to the componenttype, which leads to the restart of sink or
source using the functionadmin of function
---
.../functions/worker/FunctionRuntimeManager.java | 27 +++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index c95475a..045d43a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
+import
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -419,7 +420,16 @@ public class FunctionRuntimeManager implements
AutoCloseable{
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build());
}
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
+
+ ComponentType componentType =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+ if (ComponentType.SOURCE == componentType) {
+ this.functionAdmin.sources().restartSource(tenant,
namespace, functionName);
+ } else if (ComponentType.SINK == componentType) {
+ this.functionAdmin.sinks().restartSink(tenant, namespace,
functionName);
+ } else {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
+ }
}
} else {
for (Assignment assignment : assignments) {
@@ -442,8 +452,19 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
continue;
}
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
- assignment.getInstance().getInstanceId());
+
+ ComponentType componentType =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+ if (ComponentType.SOURCE == componentType) {
+ this.functionAdmin.sources().restartSource(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ } else if (ComponentType.SINK == componentType) {
+ this.functionAdmin.sinks().restartSink(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ } else {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ }
}
}
}