jerrypeng closed pull request #3102:  fixing and refactoring function status
URL: https://github.com/apache/pulsar/pull/3102
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index fa35ca14f2..90766d364d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -669,9 +669,8 @@ public void testPulsarFunctionStatus() throws Exception {
             }
         }, 5, 200);
 
-        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStatus functionStatus = 
functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
-                functionName, null);
+        FunctionStatus functionStatus = 
admin.functions().getFunctionStatus(tenant, namespacePortion,
+                functionName);
 
         int numInstances = functionStatus.getNumInstances();
         assertEquals(numInstances, 1);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f9e0c75829..152c1db622 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,26 +18,10 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -46,24 +30,35 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Reflections;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
 /**
  * This class managers all aspects of functions assignments and running of 
function assignments for this worker
  */
@@ -590,207 +585,6 @@ public FunctionStats getFunctionStats(String tenant, 
String namespace, String fu
         return functionStats.calculateOverall();
     }
 
-    /**
-     * Get status of a function instance.  If this worker is not running the 
function instance,
-     * @param tenant the tenant the function belongs to
-     * @param namespace the namespace the function belongs to
-     * @param functionName the function name
-     * @param instanceId the function instance id
-     * @return the function status
-     */
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(
-            String tenant, String namespace,
-            String functionName, int instanceId, URI uri) {
-        Assignment assignment;
-        if (runtimeFactory.externallyManaged()) {
-            assignment = this.findAssignment(tenant, namespace, functionName, 
-1);
-        } else {
-            assignment = this.findAssignment(tenant, namespace, functionName, 
instanceId);
-        }
-
-        if (assignment == null) {
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-            functionInstanceStatusData.setRunning(false);
-            functionInstanceStatusData.setError("Function has not been 
scheduled");
-            return functionInstanceStatusData;
-        }
-
-        final String assignedWorkerId = assignment.getWorkerId();
-        final String workerId = this.workerConfig.getWorkerId();
-
-        // If I am running worker
-        if (assignedWorkerId.equals(workerId)) {
-            FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
-                    
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
-            RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-            if (runtimeSpawner != null) {
-                try {
-                    InstanceCommunication.FunctionStatus status = 
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
-                    functionInstanceStatusData.setRunning(status.getRunning());
-                    
functionInstanceStatusData.setError(status.getFailureException());
-                    
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
-                    
functionInstanceStatusData.setNumReceived(status.getNumReceived());
-                    
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
-                    
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
-
-                    List<ExceptionInformation> userExceptionInformationList = 
new LinkedList<>();
-                    for 
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : 
status.getLatestUserExceptionsList()) {
-                        ExceptionInformation exceptionInformation
-                                = new ExceptionInformation();
-                        
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                        
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                        userExceptionInformationList.add(exceptionInformation);
-                    }
-                    
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
-
-                    
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
-                    List<ExceptionInformation> systemExceptionInformationList 
= new LinkedList<>();
-                    for 
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : 
status.getLatestSystemExceptionsList()) {
-                        ExceptionInformation exceptionInformation
-                                = new ExceptionInformation();
-                        
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                        
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                        
systemExceptionInformationList.add(exceptionInformation);
-                    }
-                    
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
-
-                    
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
-                    
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
-                    functionInstanceStatusData.setWorkerId(assignedWorkerId);
-
-
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            } else {
-                functionInstanceStatusData.setRunning(false);
-                if (functionRuntimeInfo.getStartupException() != null) {
-                    
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
-                }
-                functionInstanceStatusData.setWorkerId(assignedWorkerId);
-            }
-            return functionInstanceStatusData;
-        } else {
-            // query other worker
-
-            List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
-            WorkerInfo workerInfo = null;
-            for (WorkerInfo entry: workerInfoList) {
-                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
-                    workerInfo = entry;
-                }
-            }
-            if (workerInfo == null) {
-                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                        = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-                functionInstanceStatusData.setRunning(false);
-                functionInstanceStatusData.setError("Function has not been 
scheduled");
-                return functionInstanceStatusData;
-            }
-
-            if (uri == null) {
-                throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
-            } else {
-                URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-                throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-            }
-        }
-    }
-
-    /**
-     * Get statuses of all function instances.
-     * @param tenant the tenant the function belongs to
-     * @param namespace the namespace the function belongs to
-     * @param functionName the function name
-     * @return a list of function statuses
-     * @throws PulsarAdminException 
-     */
-    public FunctionStatus getFunctionStatus(String tenant, String namespace,
-                                            String functionName, URI uri)
-            throws PulsarAdminException {
-
-        Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
-
-        FunctionStatus functionStatus = new FunctionStatus();
-        if (assignments.isEmpty()) {
-            Function.FunctionMetaData functionMetaData = 
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, 
namespace, functionName);
-            
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
-            functionStatus.setNumRunning(0);
-
-            return functionStatus;
-        }
-
-        // TODO refactor the code for externally managed.
-        if (runtimeFactory.externallyManaged()) {
-            Assignment assignment = assignments.iterator().next();
-            boolean isOwner = 
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-            if (isOwner) {
-                int parallelism = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
-                for (int i = 0; i < parallelism; ++i) {
-                    
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                            = getFunctionInstanceStatus(tenant, namespace, 
functionName, i, null);
-                    FunctionStatus.FunctionInstanceStatus 
functionInstanceStatus
-                            = new FunctionStatus.FunctionInstanceStatus();
-                    functionInstanceStatus.setInstanceId(i);
-                    
functionInstanceStatus.setStatus(functionInstanceStatusData);
-                    functionStatus.addInstance(functionInstanceStatus);
-                }
-            } else {
-                // find the hostname/port of the worker who is the owner
-
-                List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
-                WorkerInfo workerInfo = null;
-                for (WorkerInfo entry: workerInfoList) {
-                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
-                        workerInfo = entry;
-                    }
-                }
-                if (workerInfo == null) {
-                    Function.FunctionMetaData functionMetaData = 
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, 
namespace, functionName);
-                    
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
-                    functionStatus.setNumRunning(0);
-                    return functionStatus;
-                }
-
-                if (uri == null) {
-                    throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
-                } else {
-                    URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-                    throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-                }
-            }
-        } else {
-            for (Assignment assignment : assignments) {
-                boolean isOwner = 
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
-                if (isOwner) {
-                    functionInstanceStatusData = 
getFunctionInstanceStatus(tenant, namespace, functionName, 
assignment.getInstance().getInstanceId(), null);
-                } else {
-                    functionInstanceStatusData = 
this.functionAdmin.functions().getFunctionStatus(
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                            assignment.getInstance().getInstanceId());
-                }
-
-                FunctionStatus.FunctionInstanceStatus instanceStatus = new 
FunctionStatus.FunctionInstanceStatus();
-                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
-                instanceStatus.setStatus(functionInstanceStatusData);
-                functionStatus.addInstance(instanceStatus);
-            }
-        }
-        functionStatus.setNumInstances(functionStatus.instances.size());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            if (functionInstanceStatus.getStatus().isRunning()) {
-                functionStatus.numRunning++;
-            }
-        });
-        return functionStatus;
-    }
-
     /**
      * Process an assignment update from the assignment topic
      * @param newAssignment the assignment
@@ -990,8 +784,11 @@ public void close() throws Exception {
         }
     }
 
-    private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
-        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+    public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
+        return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
     }
 
+    private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String 
fullyQualifiedInstanceId) {
+        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
similarity index 90%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index bab81e4026..8f13dc8067 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -54,6 +54,7 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -71,9 +72,9 @@
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
 import static org.apache.pulsar.functions.utils.Utils.*;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.FUNCTION;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SINK;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SOURCE;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,26 +85,30 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.StateUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -116,7 +121,7 @@
 import net.jodah.typetools.TypeResolver;
 
 @Slf4j
-public class FunctionsImplBase {
+public abstract class ComponentImpl {
 
     public enum ComponentType {
         FUNCTION("Function"),
@@ -136,15 +141,137 @@ public String toString() {
     }
 
     private final AtomicReference<StorageClient> storageClient = new 
AtomicReference<>();
-    private final Supplier<WorkerService> workerServiceSupplier;
-    private final ComponentType componentType;
+    protected final Supplier<WorkerService> workerServiceSupplier;
+    protected final ComponentType componentType;
 
-    public FunctionsImplBase(Supplier<WorkerService> workerServiceSupplier, 
ComponentType componentType) {
+    public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, 
ComponentType componentType) {
         this.workerServiceSupplier = workerServiceSupplier;
         this.componentType = componentType;
     }
 
-    private WorkerService worker() {
+    protected abstract class GetStatus<S, T> {
+
+        public abstract T notScheduledInstance();
+
+        public abstract T 
fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String 
assignedWorkerId);
+
+        public abstract T notRunning(String assignedWorkerId, String error);
+
+        public T getComponentInstanceStatus(String tenant, String namespace,
+                                            String name, int instanceId, URI 
uri) {
+
+            Function.Assignment assignment;
+            if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+                assignment = 
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, 
name, -1);
+            } else {
+                assignment = 
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, 
name, instanceId);
+            }
+
+            if (assignment == null) {
+                return notScheduledInstance();
+            }
+
+            final String assignedWorkerId = assignment.getWorkerId();
+            final String workerId = worker().getWorkerConfig().getWorkerId();
+
+            // If I am running worker
+            if (assignedWorkerId.equals(workerId)) {
+                FunctionRuntimeInfo functionRuntimeInfo = 
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
+                        
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                if (functionRuntimeInfo == null) {
+                    log.error("{} in get {} Status does not exist @ 
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
+                    throw new RestException(Status.NOT_FOUND, 
String.format("%s %s doesn't exist", componentType, name));
+                }
+                RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+                if (runtimeSpawner != null) {
+                    try {
+                        return fromFunctionStatusProto(
+                                
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(),
+                                assignedWorkerId);
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    return notRunning(assignedWorkerId, 
functionRuntimeInfo.getStartupException().getMessage());
+                }
+            } else {
+                // query other worker
+
+                List<WorkerInfo> workerInfoList = 
worker().getMembershipManager().getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry : workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    return notScheduledInstance();
+                }
+
+                if (uri == null) {
+                    throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                } else {
+                    URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                    throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
+            }
+        }
+
+        public abstract S getStatus(String tenant, String namespace,
+                                    String name, 
Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException;
+
+        public abstract S getStatusExternal(String tenant, String namespace,
+                                            String name, int parallelism);
+
+        public abstract S emptyStatus(int parallelism);
+
+        public S getComponentStatus(String tenant, String namespace,
+                                    String name, URI uri) {
+
+            Function.FunctionMetaData functionMetaData = 
worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, 
name);
+
+            Collection<Function.Assignment> assignments = 
worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, 
name);
+
+            // TODO refactor the code for externally managed.
+            if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+                Function.Assignment assignment = assignments.iterator().next();
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                if (isOwner) {
+                    return getStatusExternal(tenant, namespace, name, 
functionMetaData.getFunctionDetails().getParallelism());
+                } else {
+
+                    // find the hostname/port of the worker who is the owner
+
+                    List<WorkerInfo> workerInfoList = 
worker().getMembershipManager().getCurrentMembership();
+                    WorkerInfo workerInfo = null;
+                    for (WorkerInfo entry: workerInfoList) {
+                        if 
(assignment.getWorkerId().equals(entry.getWorkerId())) {
+                            workerInfo = entry;
+                        }
+                    }
+                    if (workerInfo == null) {
+                        return 
emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
+                    }
+
+                    if (uri == null) {
+                        throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                    } else {
+                        URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                        throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+                    }
+                }
+            } else {
+                try {
+                    return getStatus(tenant, namespace, name, assignments, 
uri);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    protected WorkerService worker() {
         try {
             return checkNotNull(workerServiceSupplier.get());
         } catch (Throwable t) {
@@ -153,7 +280,7 @@ private WorkerService worker() {
         }
     }
 
-    private boolean isWorkerServiceAvailable() {
+    boolean isWorkerServiceAvailable() {
         WorkerService workerService = workerServiceSupplier.get();
         if (workerService == null) {
             return false;
@@ -719,96 +846,6 @@ public Response stopFunctionInstances(final String tenant, 
final String namespac
         }
     }
 
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant, final String namespace, final 
String componentName,
-                                                                               
                       final String instanceId, URI uri) throws IOException {
-
-        if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
-        }
-
-        // validate parameters
-        try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, 
componentName, componentType, instanceId);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            throw new RestException(Status.BAD_REQUEST, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-
-        }
-        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-        int instanceIdInt = Integer.parseInt(instanceId);
-        if (instanceIdInt < 0 || instanceIdInt >= 
functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", 
componentType, tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s 
doesn't have instance with id %s", componentType, componentName, instanceId));
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
-        try {
-            functionInstanceStatusData = 
functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, 
componentName,
-                    Integer.parseInt(instanceId), uri);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        return functionInstanceStatusData;
-    }
-
-    public FunctionStatus getFunctionStatus(final String tenant, final String 
namespace, final String componentName,
-                                            URI uri)
-            throws IOException {
-
-        if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
-        }
-
-        // validate parameters
-        try {
-            validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            throw new RestException(Status.BAD_REQUEST, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-
-        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-        FunctionStatus functionStatus;
-        try {
-            functionStatus = functionRuntimeManager.getFunctionStatus(tenant, 
namespace, componentName, uri);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        return functionStatus;
-    }
-
     public FunctionStats getFunctionStats(final String tenant, final String 
namespace, final String componentName,
                                           URI uri) throws IOException {
         if (!isWorkerServiceAvailable()) {
@@ -1176,7 +1213,7 @@ private void validateListFunctionRequestParams(String 
tenant, String namespace)
         }
     }
 
-    private void validateGetFunctionInstanceRequestParams(String tenant, 
String namespace, String componentName,
+    protected void validateGetFunctionInstanceRequestParams(String tenant, 
String namespace, String componentName,
                                                           ComponentType 
componentType, String instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
         if (instanceId == null) {
@@ -1184,7 +1221,7 @@ private void 
validateGetFunctionInstanceRequestParams(String tenant, String name
         }
     }
 
-    private void validateGetFunctionRequestParams(String tenant, String 
namespace, String subject, ComponentType componentType)
+    protected void validateGetFunctionRequestParams(String tenant, String 
namespace, String subject, ComponentType componentType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1591,4 +1628,42 @@ public ComponentType 
calculateSubjectType(FunctionMetaData functionMetaData) {
         return SINK;
     }
 
+    protected void componentStatusRequestValidate (final String tenant, final 
String namespace, final String componentName) {
+        if (!isWorkerServiceAvailable()) {
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+    }
+
+    protected void componentInstanceStatusRequestValidate (final String 
tenant, final String namespace,
+                                                           final String 
componentName, int instanceId) {
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        int parallelism = 
functionMetaData.getFunctionDetails().getParallelism();
+        if (instanceId < 0 || instanceId >= parallelism) {
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", 
componentType, tenant, namespace, componentName);
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s doesn't have instance with id %s", componentType, 
componentName, instanceId));
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index eb0e696965..355cf11379 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,14 +19,224 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
 @Slf4j
-public class FunctionsImpl extends FunctionsImplBase {
+public class FunctionsImpl extends ComponentImpl {
+
+    private class GetFunctionStatus extends GetStatus<FunctionStatus, 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
notScheduledInstance() {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(false);
+            functionInstanceStatusData.setError("Function has not been 
scheduled");
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(status.getRunning());
+            functionInstanceStatusData.setError(status.getFailureException());
+            functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            functionInstanceStatusData.setNumReceived(status.getNumReceived());
+            
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
+            
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
+
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
+            
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
+
+            
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
+            
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(false);
+            if (error != null) {
+                functionInstanceStatusData.setError(error);
+            }
+            functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public FunctionStatus getStatus(String tenant, String namespace, 
String name, Collection<Function.Assignment>
+                assignments, URI uri) throws PulsarAdminException {
+            FunctionStatus functionStatus = new FunctionStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
+                if (isOwner) {
+                    functionInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, assignment
+                            .getInstance().getInstanceId(), null);
+                } else {
+                    functionInstanceStatusData = 
worker().getFunctionAdmin().functions().getFunctionStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                FunctionStatus.FunctionInstanceStatus instanceStatus = new 
FunctionStatus.FunctionInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(functionInstanceStatusData);
+                functionStatus.addInstance(instanceStatus);
+            }
+
+            functionStatus.setNumInstances(functionStatus.instances.size());
+            functionStatus.getInstances().forEach(functionInstanceStatus -> {
+                if (functionInstanceStatus.getStatus().isRunning()) {
+                    functionStatus.numRunning++;
+                }
+            });
+            return functionStatus;
+        }
+
+        @Override
+        public FunctionStatus getStatusExternal(String tenant, String 
namespace, String name, int parallelism) {
+            FunctionStatus functionStatus = new FunctionStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                FunctionStatus.FunctionInstanceStatus functionInstanceStatus
+                        = new FunctionStatus.FunctionInstanceStatus();
+                functionInstanceStatus.setInstanceId(i);
+                functionInstanceStatus.setStatus(functionInstanceStatusData);
+                functionStatus.addInstance(functionInstanceStatus);
+            }
+
+            functionStatus.setNumInstances(functionStatus.instances.size());
+            functionStatus.getInstances().forEach(functionInstanceStatus -> {
+                if (functionInstanceStatus.getStatus().isRunning()) {
+                    functionStatus.numRunning++;
+                }
+            });
+            return functionStatus;
+        }
+
+        @Override
+        public FunctionStatus emptyStatus(int parallelism) {
+            FunctionStatus functionStatus = new FunctionStatus();
+            functionStatus.setNumInstances(parallelism);
+            functionStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                FunctionStatus.FunctionInstanceStatus functionInstanceStatus = 
new FunctionStatus.FunctionInstanceStatus();
+                functionInstanceStatus.setInstanceId(i);
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                        = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+                functionInstanceStatusData.setRunning(false);
+                functionInstanceStatusData.setError("Function has not been 
scheduled");
+                functionInstanceStatus.setStatus(functionInstanceStatusData);
+
+                functionStatus.addInstance(functionInstanceStatus);
+            }
+
+            return functionStatus;
+        }
+    }
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.FUNCTION);
     }
+
+    /**
+     * Get status of a function instance.  If this worker is not running the 
function instance,
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param componentName the function name
+     * @param instanceId the function instance id
+     * @return the function status
+     */
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant, final String namespace, final 
String componentName,
+                                                                               
                       final String instanceId, URI uri) throws IOException {
+
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, 
componentName, Integer.parseInt(instanceId));
+
+        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
+        try {
+            functionInstanceStatusData = new 
GetFunctionStatus().getComponentInstanceStatus(tenant, namespace, componentName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return functionInstanceStatusData;
+    }
+
+    /**
+     * Get statuses of all function instances.
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param componentName the function name
+     * @return a list of function statuses
+     * @throws PulsarAdminException
+     */
+    public FunctionStatus getFunctionStatus(final String tenant, final String 
namespace, final String componentName,
+                                            URI uri) {
+
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        FunctionStatus functionStatus;
+        try {
+            functionStatus = new 
GetFunctionStatus().getComponentStatus(tenant, namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return functionStatus;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index b7202f515d..4ecba12964 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -18,63 +18,205 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
-public class SinkImpl extends FunctionsImplBase {
-    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SINK);
-    }
+@Slf4j
+public class SinkImpl extends ComponentImpl {
 
+    private class GetSinkStatus extends GetStatus<SinkStatus, 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
-    public SinkStatus getSinkStatus(final String tenant, final String 
namespace,
-                                        final String componentName, URI uri) 
throws IOException {
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
notScheduledInstance() {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(false);
+            sinkInstanceStatusData.setError("Sink has not been scheduled");
+            return sinkInstanceStatusData;
+        }
 
-        FunctionStatus functionStatus = getFunctionStatus(tenant, namespace, 
componentName, uri);
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(status.getRunning());
+            sinkInstanceStatusData.setError(status.getFailureException());
+            sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            sinkInstanceStatusData.setNumReceived(status.getNumReceived());
 
-        SinkStatus sinkStatus = new SinkStatus();
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
 
-        sinkStatus.setNumInstances(functionStatus.getNumInstances());
-        sinkStatus.setNumRunning(functionStatus.getNumRunning());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            SinkStatus.SinkInstanceStatus sinkInstanceStatus = new 
SinkStatus.SinkInstanceStatus();
-            
sinkInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-            
sinkInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
-            sinkStatus.addInstance(sinkInstanceStatus);
-        });
-        return sinkStatus;
+            
sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sinkInstanceStatusData;
+        }
+
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(false);
+            if (error != null) {
+                sinkInstanceStatusData.setError(error);
+            }
+            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sinkInstanceStatusData;
+        }
+
+        @Override
+        public SinkStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+            SinkStatus sinkStatus = new SinkStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData;
+                if (isOwner) {
+                    sinkInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, 
assignment.getInstance().getInstanceId(), null);
+                } else {
+                    sinkInstanceStatusData = 
worker().getFunctionAdmin().sink().getSinkStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                SinkStatus.SinkInstanceStatus instanceStatus = new 
SinkStatus.SinkInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(sinkInstanceStatusData);
+                sinkStatus.addInstance(instanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+                if (sinkInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SinkStatus getStatusExternal (String tenant, String namespace, 
String name, int parallelism) {
+            SinkStatus sinkStatus = new SinkStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                SinkStatus.SinkInstanceStatus sinkInstanceStatus
+                        = new SinkStatus.SinkInstanceStatus();
+                sinkInstanceStatus.setInstanceId(i);
+                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+                sinkStatus.addInstance(sinkInstanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+                if (sinkInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SinkStatus emptyStatus(int parallelism) {
+            SinkStatus sinkStatus = new SinkStatus();
+            sinkStatus.setNumInstances(parallelism);
+            sinkStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new 
SinkStatus.SinkInstanceStatus();
+                sinkInstanceStatus.setInstanceId(i);
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                        = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+                sinkInstanceStatusData.setRunning(false);
+                sinkInstanceStatusData.setError("Sink has not been scheduled");
+                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+
+                sinkStatus.addInstance(sinkInstanceStatus);
+            }
+
+            return sinkStatus;
+        }
     }
 
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(String tenant,
-                                                                               
               String namespace,
-                                                                               
               String sinkName,
-                                                                               
               String instanceId,
-                                                                               
               URI requestUri)
+    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, ComponentType.SINK);
+    }
+
+    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(
+            String tenant, String namespace, String sinkName, String 
instanceId, URI uri)
             throws IOException {
 
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                = getFunctionInstanceStatus(tenant, namespace, sinkName, 
instanceId, requestUri);
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
 
-        return fromFunctionInstanceStatus(functionInstanceStatusData);
-    }
 
-    private static SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
fromFunctionInstanceStatus(
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData) {
-        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
-                = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
-
-        sinkInstanceStatusData.setError(functionInstanceStatusData.getError());
-        
sinkInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-        
sinkInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-        
sinkInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-        
sinkInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-        
sinkInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-        
sinkInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData;
+        try {
+            sinkInstanceStatusData = new 
GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
         return sinkInstanceStatusData;
     }
+
+    public SinkStatus getSinkStatus(
+            final String tenant, final String namespace,
+            final String componentName, URI uri) {
+
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        SinkStatus sinkStatus;
+        try {
+            sinkStatus = new GetSinkStatus().getComponentStatus(tenant, 
namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return sinkStatus;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f69b03ea65..412019a184 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -18,62 +18,199 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
-public class SourceImpl extends FunctionsImplBase {
+@Slf4j
+public class SourceImpl extends ComponentImpl {
+    private class GetSourceStatus extends GetStatus<SourceStatus, 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
notScheduledInstance() {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(false);
+            sourceInstanceStatusData.setError("Source has not been scheduled");
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(status.getRunning());
+            sourceInstanceStatusData.setError(status.getFailureException());
+            sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
+
+            
sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(false);
+            if (error != null) {
+                sourceInstanceStatusData.setError(error);
+            }
+            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+            SourceStatus sourceStatus = new SourceStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData;
+                if (isOwner) {
+                    sourceInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, 
assignment.getInstance().getInstanceId(), null);
+                } else {
+                    sourceInstanceStatusData = 
worker().getFunctionAdmin().source().getSourceStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                SourceStatus.SourceInstanceStatus instanceStatus = new 
SourceStatus.SourceInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(sourceInstanceStatusData);
+                sourceStatus.addInstance(instanceStatus);
+            }
+
+            sourceStatus.setNumInstances(sourceStatus.instances.size());
+            sourceStatus.getInstances().forEach(sourceInstanceStatus -> {
+                if (sourceInstanceStatus.getStatus().isRunning()) {
+                    sourceStatus.numRunning++;
+                }
+            });
+            return sourceStatus;
+        }
+
+        @Override
+        public SourceStatus getStatusExternal(String tenant, String namespace, 
String name, int parallelism) {
+            SourceStatus sinkStatus = new SourceStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                SourceStatus.SourceInstanceStatus sourceInstanceStatus
+                        = new SourceStatus.SourceInstanceStatus();
+                sourceInstanceStatus.setInstanceId(i);
+                sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+                sinkStatus.addInstance(sourceInstanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sourceInstanceStatus -> {
+                if (sourceInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SourceStatus emptyStatus(int parallelism) {
+            SourceStatus sourceStatus = new SourceStatus();
+            sourceStatus.setNumInstances(parallelism);
+            sourceStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                SourceStatus.SourceInstanceStatus sourceInstanceStatus = new 
SourceStatus.SourceInstanceStatus();
+                sourceInstanceStatus.setInstanceId(i);
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                        = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+                sourceInstanceStatusData.setRunning(false);
+                sourceInstanceStatusData.setError("Source has not been 
scheduled");
+                sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+
+                sourceStatus.addInstance(sourceInstanceStatus);
+            }
+
+            return sourceStatus;
+        }
+    }
+
     public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
     public SourceStatus getSourceStatus(final String tenant, final String 
namespace,
                                         final String componentName, URI uri) 
throws IOException {
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
 
-        FunctionStatus functionStatus = getFunctionStatus(tenant, namespace, 
componentName, uri);
-
-        SourceStatus sourceStatus = new SourceStatus();
+        SourceStatus sourceStatus;
+        try {
+            sourceStatus = new GetSourceStatus().getComponentStatus(tenant, 
namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
 
-        sourceStatus.setNumInstances(functionStatus.getNumInstances());
-        sourceStatus.setNumRunning(functionStatus.getNumRunning());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            SourceStatus.SourceInstanceStatus sourceInstanceStatus = new 
SourceStatus.SourceInstanceStatus();
-            
sourceInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-            
sourceInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
-            sourceStatus.addInstance(sourceInstanceStatus);
-        });
         return sourceStatus;
     }
 
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(String tenant,
-                                                                               
               String namespace,
-                                                                               
               String sourceName,
-                                                                               
               String instanceId,
-                                                                               
               URI requestUri)
-            throws IOException {
-
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                = getFunctionInstanceStatus(tenant, namespace, sourceName, 
instanceId, requestUri);
-
-        return fromFunctionInstanceStatus(functionInstanceStatusData);
-    }
+    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(
+            String tenant, String namespace, String sourceName, String 
instanceId, URI uri) {
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
 
-    private static SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
fromFunctionInstanceStatus(
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData) {
-        SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
-                = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
-
-        
sourceInstanceStatusData.setError(functionInstanceStatusData.getError());
-        
sourceInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-        
sourceInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-        
sourceInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-        
sourceInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-        
sourceInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-        
sourceInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+        SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData;
+        try {
+            sourceInstanceStatusData = new 
GetSourceStatus().getComponentInstanceStatus(tenant, namespace, sourceName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
         return sourceInstanceStatusData;
     }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index bf3766cc8a..98747edd32 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -70,7 +70,7 @@
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -169,7 +169,7 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1220,9 +1220,9 @@ public void testOnlyGetSources() throws Exception {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultFunctions();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 234ca2f881..a0e6843769 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -43,8 +43,7 @@
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -174,7 +173,7 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1183,9 +1182,9 @@ public void testOnlyGetSinks() throws Exception {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultSinks();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 8f72123804..fcba85c61a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -41,8 +41,7 @@
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -159,7 +158,7 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1186,9 +1185,9 @@ public void testOnlyGetSources() throws Exception {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultSources();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7ede4cdd9..5add9a1455 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -255,6 +255,7 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
 
                 SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
                 try {
@@ -481,6 +482,8 @@ protected void getSourceStatus(String tenant, String 
namespace, String sourceNam
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SourceStatus sourceStatus = 
SourceStatus.decode(result.getStdout());
                 try {
                     assertEquals(sourceStatus.getNumInstances(), 1);
@@ -532,6 +535,8 @@ protected void waitForProcessingSourceMessages(String 
tenant,
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SourceStatus sourceStatus = 
SourceStatus.decode(result.getStdout());
                 try {
                     assertEquals(sourceStatus.getNumInstances(), 1);
@@ -574,6 +579,8 @@ protected void waitForProcessingSinkMessages(String tenant,
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
                 try {
                     assertEquals(sinkStatus.getNumInstances(), 1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to