This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7cc54882edc78c4067903b796ffb5fc0e2cb6283
Author: hexiaocheng <[email protected]>
AuthorDate: Wed Feb 10 08:15:24 2021 +0800

    [Functions] Call the corresponding restart according to the componenttype. 
#9502 (#9519)
    
    Fixes #9502
    
    Add judgment on component type,Call the corresponding restart according to 
the componenttype
    
    
    ### Motivation
    Restart all instances of a Pulsar Source failed.
    When I call the 
"https://pulsar.apache.org/admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart";
    See error "Failed to perform http post request: 
javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
    Function xxxx doesn't exist".
    The reason for the problem is that the corresponding functionadmin is not 
found according to the componenttype, which leads to the restart of sink or 
source using the functionadmin of function
    
    (cherry picked from commit 1f1b96dd7fe79937733c4e796217fc5eb6359e31)
---
 .../functions/worker/FunctionRuntimeManager.java   | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bb07115..bb39109 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -419,7 +420,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                             .type(MediaType.APPLICATION_JSON)
                             .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build());
                 }
-                this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
+
+                ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+                if (ComponentType.SOURCE == componentType) {
+                    this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName);
+                } else if (ComponentType.SINK == componentType) {
+                    this.functionAdmin.sinks().restartSink(tenant, namespace, 
functionName);
+                } else {
+                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
+                }
             }
         } else {
             for (Assignment assignment : assignments) {
@@ -442,8 +452,19 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         }
                         continue;
                     }
-                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
-                                assignment.getInstance().getInstanceId());
+
+                    ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+                    if (ComponentType.SOURCE == componentType) {
+                        this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    } else if (ComponentType.SINK == componentType) {
+                        this.functionAdmin.sinks().restartSink(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    } else {
+                        this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    }
                 }
             }
         }

Reply via email to