This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f1b96d  [Functions] Call the corresponding restart according to the 
componenttype. #9502 (#9519)
1f1b96d is described below

commit 1f1b96dd7fe79937733c4e796217fc5eb6359e31
Author: hexiaocheng <[email protected]>
AuthorDate: Wed Feb 10 08:15:24 2021 +0800

    [Functions] Call the corresponding restart according to the componenttype. 
#9502 (#9519)
    
    Fixes #9502
    
    Add judgment on component type,Call the corresponding restart according to 
the componenttype
    
    
    ### Motivation
    Restart all instances of a Pulsar Source failed.
    When I call the 
"https://pulsar.apache.org/admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart";
    See error "Failed to perform http post request: 
javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
    Function xxxx doesn't exist".
    The reason for the problem is that the corresponding functionadmin is not 
found according to the componenttype, which leads to the restart of sink or 
source using the functionadmin of function
---
 .../functions/worker/FunctionRuntimeManager.java   | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index c95475a..045d43a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -419,7 +420,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                             .type(MediaType.APPLICATION_JSON)
                             .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build());
                 }
-                this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
+
+                ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+                if (ComponentType.SOURCE == componentType) {
+                    this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName);
+                } else if (ComponentType.SINK == componentType) {
+                    this.functionAdmin.sinks().restartSink(tenant, namespace, 
functionName);
+                } else {
+                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
+                }
             }
         } else {
             for (Assignment assignment : assignments) {
@@ -442,8 +452,19 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         }
                         continue;
                     }
-                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
-                                assignment.getInstance().getInstanceId());
+
+                    ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+
+                    if (ComponentType.SOURCE == componentType) {
+                        this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    } else if (ComponentType.SINK == componentType) {
+                        this.functionAdmin.sinks().restartSink(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    } else {
+                        this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                    }
                 }
             }
         }

Reply via email to