This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4794bd   fixing and refactoring function status (#3102)
d4794bd is described below

commit d4794bd5e100a946780ff562bf45ef44a7dc5281
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sun Dec 2 20:06:26 2018 -0800

     fixing and refactoring function status (#3102)
    
    * fixing and refactoring function status
    
    * further refactoring
    
    * cleaning up
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |   5 +-
 .../functions/worker/FunctionRuntimeManager.java   | 261 +++----------------
 .../{FunctionsImplBase.java => ComponentImpl.java} | 279 +++++++++++++--------
 .../functions/worker/rest/api/FunctionsImpl.java   | 212 +++++++++++++++-
 .../pulsar/functions/worker/rest/api/SinkImpl.java | 220 +++++++++++++---
 .../functions/worker/rest/api/SourceImpl.java      | 211 +++++++++++++---
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  10 +-
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  |  11 +-
 .../rest/api/v2/SourceApiV2ResourceTest.java       |  11 +-
 .../integration/functions/PulsarFunctionsTest.java |   7 +
 10 files changed, 796 insertions(+), 431 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index fa35ca1..90766d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -669,9 +669,8 @@ public class PulsarFunctionE2ETest {
             }
         }, 5, 200);
 
-        FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStatus functionStatus = 
functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
-                functionName, null);
+        FunctionStatus functionStatus = 
admin.functions().getFunctionStatus(tenant, namespacePortion,
+                functionName);
 
         int numInstances = functionStatus.getNumInstances();
         assertEquals(numInstances, 1);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f9e0c75..152c1db 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,26 +18,10 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -46,24 +30,35 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Reflections;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
 /**
  * This class managers all aspects of functions assignments and running of 
function assignments for this worker
  */
@@ -591,207 +586,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     /**
-     * Get status of a function instance.  If this worker is not running the 
function instance,
-     * @param tenant the tenant the function belongs to
-     * @param namespace the namespace the function belongs to
-     * @param functionName the function name
-     * @param instanceId the function instance id
-     * @return the function status
-     */
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(
-            String tenant, String namespace,
-            String functionName, int instanceId, URI uri) {
-        Assignment assignment;
-        if (runtimeFactory.externallyManaged()) {
-            assignment = this.findAssignment(tenant, namespace, functionName, 
-1);
-        } else {
-            assignment = this.findAssignment(tenant, namespace, functionName, 
instanceId);
-        }
-
-        if (assignment == null) {
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-            functionInstanceStatusData.setRunning(false);
-            functionInstanceStatusData.setError("Function has not been 
scheduled");
-            return functionInstanceStatusData;
-        }
-
-        final String assignedWorkerId = assignment.getWorkerId();
-        final String workerId = this.workerConfig.getWorkerId();
-
-        // If I am running worker
-        if (assignedWorkerId.equals(workerId)) {
-            FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
-                    
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
-            RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-            if (runtimeSpawner != null) {
-                try {
-                    InstanceCommunication.FunctionStatus status = 
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
-                    functionInstanceStatusData.setRunning(status.getRunning());
-                    
functionInstanceStatusData.setError(status.getFailureException());
-                    
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
-                    
functionInstanceStatusData.setNumReceived(status.getNumReceived());
-                    
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
-                    
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
-
-                    List<ExceptionInformation> userExceptionInformationList = 
new LinkedList<>();
-                    for 
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : 
status.getLatestUserExceptionsList()) {
-                        ExceptionInformation exceptionInformation
-                                = new ExceptionInformation();
-                        
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                        
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                        userExceptionInformationList.add(exceptionInformation);
-                    }
-                    
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
-
-                    
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
-                    List<ExceptionInformation> systemExceptionInformationList 
= new LinkedList<>();
-                    for 
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : 
status.getLatestSystemExceptionsList()) {
-                        ExceptionInformation exceptionInformation
-                                = new ExceptionInformation();
-                        
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                        
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                        
systemExceptionInformationList.add(exceptionInformation);
-                    }
-                    
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
-
-                    
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
-                    
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
-                    functionInstanceStatusData.setWorkerId(assignedWorkerId);
-
-
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            } else {
-                functionInstanceStatusData.setRunning(false);
-                if (functionRuntimeInfo.getStartupException() != null) {
-                    
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
-                }
-                functionInstanceStatusData.setWorkerId(assignedWorkerId);
-            }
-            return functionInstanceStatusData;
-        } else {
-            // query other worker
-
-            List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
-            WorkerInfo workerInfo = null;
-            for (WorkerInfo entry: workerInfoList) {
-                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
-                    workerInfo = entry;
-                }
-            }
-            if (workerInfo == null) {
-                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                        = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
-                functionInstanceStatusData.setRunning(false);
-                functionInstanceStatusData.setError("Function has not been 
scheduled");
-                return functionInstanceStatusData;
-            }
-
-            if (uri == null) {
-                throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
-            } else {
-                URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-                throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-            }
-        }
-    }
-
-    /**
-     * Get statuses of all function instances.
-     * @param tenant the tenant the function belongs to
-     * @param namespace the namespace the function belongs to
-     * @param functionName the function name
-     * @return a list of function statuses
-     * @throws PulsarAdminException 
-     */
-    public FunctionStatus getFunctionStatus(String tenant, String namespace,
-                                            String functionName, URI uri)
-            throws PulsarAdminException {
-
-        Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
-
-        FunctionStatus functionStatus = new FunctionStatus();
-        if (assignments.isEmpty()) {
-            Function.FunctionMetaData functionMetaData = 
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, 
namespace, functionName);
-            
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
-            functionStatus.setNumRunning(0);
-
-            return functionStatus;
-        }
-
-        // TODO refactor the code for externally managed.
-        if (runtimeFactory.externallyManaged()) {
-            Assignment assignment = assignments.iterator().next();
-            boolean isOwner = 
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-            if (isOwner) {
-                int parallelism = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
-                for (int i = 0; i < parallelism; ++i) {
-                    
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                            = getFunctionInstanceStatus(tenant, namespace, 
functionName, i, null);
-                    FunctionStatus.FunctionInstanceStatus 
functionInstanceStatus
-                            = new FunctionStatus.FunctionInstanceStatus();
-                    functionInstanceStatus.setInstanceId(i);
-                    
functionInstanceStatus.setStatus(functionInstanceStatusData);
-                    functionStatus.addInstance(functionInstanceStatus);
-                }
-            } else {
-                // find the hostname/port of the worker who is the owner
-
-                List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
-                WorkerInfo workerInfo = null;
-                for (WorkerInfo entry: workerInfoList) {
-                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
-                        workerInfo = entry;
-                    }
-                }
-                if (workerInfo == null) {
-                    Function.FunctionMetaData functionMetaData = 
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, 
namespace, functionName);
-                    
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
-                    functionStatus.setNumRunning(0);
-                    return functionStatus;
-                }
-
-                if (uri == null) {
-                    throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
-                } else {
-                    URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-                    throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-                }
-            }
-        } else {
-            for (Assignment assignment : assignments) {
-                boolean isOwner = 
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
-                if (isOwner) {
-                    functionInstanceStatusData = 
getFunctionInstanceStatus(tenant, namespace, functionName, 
assignment.getInstance().getInstanceId(), null);
-                } else {
-                    functionInstanceStatusData = 
this.functionAdmin.functions().getFunctionStatus(
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                            assignment.getInstance().getInstanceId());
-                }
-
-                FunctionStatus.FunctionInstanceStatus instanceStatus = new 
FunctionStatus.FunctionInstanceStatus();
-                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
-                instanceStatus.setStatus(functionInstanceStatusData);
-                functionStatus.addInstance(instanceStatus);
-            }
-        }
-        functionStatus.setNumInstances(functionStatus.instances.size());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            if (functionInstanceStatus.getStatus().isRunning()) {
-                functionStatus.numRunning++;
-            }
-        });
-        return functionStatus;
-    }
-
-    /**
      * Process an assignment update from the assignment topic
      * @param newAssignment the assignment
      */
@@ -990,8 +784,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
-    private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
-        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+    public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
+        return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
     }
 
+    private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String 
fullyQualifiedInstanceId) {
+        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
similarity index 90%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index bab81e4..8f13dc8 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -54,6 +54,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -71,9 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
 import static org.apache.pulsar.functions.utils.Utils.*;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.FUNCTION;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SINK;
-import static 
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SOURCE;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
+import static 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,26 +85,30 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.StateUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -116,7 +121,7 @@ import 
org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import net.jodah.typetools.TypeResolver;
 
 @Slf4j
-public class FunctionsImplBase {
+public abstract class ComponentImpl {
 
     public enum ComponentType {
         FUNCTION("Function"),
@@ -136,15 +141,137 @@ public class FunctionsImplBase {
     }
 
     private final AtomicReference<StorageClient> storageClient = new 
AtomicReference<>();
-    private final Supplier<WorkerService> workerServiceSupplier;
-    private final ComponentType componentType;
+    protected final Supplier<WorkerService> workerServiceSupplier;
+    protected final ComponentType componentType;
 
-    public FunctionsImplBase(Supplier<WorkerService> workerServiceSupplier, 
ComponentType componentType) {
+    public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, 
ComponentType componentType) {
         this.workerServiceSupplier = workerServiceSupplier;
         this.componentType = componentType;
     }
 
-    private WorkerService worker() {
+    protected abstract class GetStatus<S, T> {
+
+        public abstract T notScheduledInstance();
+
+        public abstract T 
fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String 
assignedWorkerId);
+
+        public abstract T notRunning(String assignedWorkerId, String error);
+
+        public T getComponentInstanceStatus(String tenant, String namespace,
+                                            String name, int instanceId, URI 
uri) {
+
+            Function.Assignment assignment;
+            if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+                assignment = 
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, 
name, -1);
+            } else {
+                assignment = 
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, 
name, instanceId);
+            }
+
+            if (assignment == null) {
+                return notScheduledInstance();
+            }
+
+            final String assignedWorkerId = assignment.getWorkerId();
+            final String workerId = worker().getWorkerConfig().getWorkerId();
+
+            // If I am running worker
+            if (assignedWorkerId.equals(workerId)) {
+                FunctionRuntimeInfo functionRuntimeInfo = 
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
+                        
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                if (functionRuntimeInfo == null) {
+                    log.error("{} in get {} Status does not exist @ 
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
+                    throw new RestException(Status.NOT_FOUND, 
String.format("%s %s doesn't exist", componentType, name));
+                }
+                RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+                if (runtimeSpawner != null) {
+                    try {
+                        return fromFunctionStatusProto(
+                                
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(),
+                                assignedWorkerId);
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    return notRunning(assignedWorkerId, 
functionRuntimeInfo.getStartupException().getMessage());
+                }
+            } else {
+                // query other worker
+
+                List<WorkerInfo> workerInfoList = 
worker().getMembershipManager().getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry : workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    return notScheduledInstance();
+                }
+
+                if (uri == null) {
+                    throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                } else {
+                    URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                    throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
+            }
+        }
+
+        public abstract S getStatus(String tenant, String namespace,
+                                    String name, 
Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException;
+
+        public abstract S getStatusExternal(String tenant, String namespace,
+                                            String name, int parallelism);
+
+        public abstract S emptyStatus(int parallelism);
+
+        public S getComponentStatus(String tenant, String namespace,
+                                    String name, URI uri) {
+
+            Function.FunctionMetaData functionMetaData = 
worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, 
name);
+
+            Collection<Function.Assignment> assignments = 
worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, 
name);
+
+            // TODO refactor the code for externally managed.
+            if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+                Function.Assignment assignment = assignments.iterator().next();
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                if (isOwner) {
+                    return getStatusExternal(tenant, namespace, name, 
functionMetaData.getFunctionDetails().getParallelism());
+                } else {
+
+                    // find the hostname/port of the worker who is the owner
+
+                    List<WorkerInfo> workerInfoList = 
worker().getMembershipManager().getCurrentMembership();
+                    WorkerInfo workerInfo = null;
+                    for (WorkerInfo entry: workerInfoList) {
+                        if 
(assignment.getWorkerId().equals(entry.getWorkerId())) {
+                            workerInfo = entry;
+                        }
+                    }
+                    if (workerInfo == null) {
+                        return 
emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
+                    }
+
+                    if (uri == null) {
+                        throw new 
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                    } else {
+                        URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                        throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+                    }
+                }
+            } else {
+                try {
+                    return getStatus(tenant, namespace, name, assignments, 
uri);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    protected WorkerService worker() {
         try {
             return checkNotNull(workerServiceSupplier.get());
         } catch (Throwable t) {
@@ -153,7 +280,7 @@ public class FunctionsImplBase {
         }
     }
 
-    private boolean isWorkerServiceAvailable() {
+    boolean isWorkerServiceAvailable() {
         WorkerService workerService = workerServiceSupplier.get();
         if (workerService == null) {
             return false;
@@ -719,96 +846,6 @@ public class FunctionsImplBase {
         }
     }
 
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant, final String namespace, final 
String componentName,
-                                                                               
                       final String instanceId, URI uri) throws IOException {
-
-        if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
-        }
-
-        // validate parameters
-        try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, 
componentName, componentType, instanceId);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            throw new RestException(Status.BAD_REQUEST, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-
-        }
-        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-        int instanceIdInt = Integer.parseInt(instanceId);
-        if (instanceIdInt < 0 || instanceIdInt >= 
functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", 
componentType, tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s 
doesn't have instance with id %s", componentType, componentName, instanceId));
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
-        try {
-            functionInstanceStatusData = 
functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, 
componentName,
-                    Integer.parseInt(instanceId), uri);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        return functionInstanceStatusData;
-    }
-
-    public FunctionStatus getFunctionStatus(final String tenant, final String 
namespace, final String componentName,
-                                            URI uri)
-            throws IOException {
-
-        if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
-        }
-
-        // validate parameters
-        try {
-            validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
-            throw new RestException(Status.BAD_REQUEST, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-
-        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-        FunctionStatus functionStatus;
-        try {
-            functionStatus = functionRuntimeManager.getFunctionStatus(tenant, 
namespace, componentName, uri);
-        } catch (WebApplicationException we) {
-            throw we;
-        } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        return functionStatus;
-    }
-
     public FunctionStats getFunctionStats(final String tenant, final String 
namespace, final String componentName,
                                           URI uri) throws IOException {
         if (!isWorkerServiceAvailable()) {
@@ -1176,7 +1213,7 @@ public class FunctionsImplBase {
         }
     }
 
-    private void validateGetFunctionInstanceRequestParams(String tenant, 
String namespace, String componentName,
+    protected void validateGetFunctionInstanceRequestParams(String tenant, 
String namespace, String componentName,
                                                           ComponentType 
componentType, String instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
         if (instanceId == null) {
@@ -1184,7 +1221,7 @@ public class FunctionsImplBase {
         }
     }
 
-    private void validateGetFunctionRequestParams(String tenant, String 
namespace, String subject, ComponentType componentType)
+    protected void validateGetFunctionRequestParams(String tenant, String 
namespace, String subject, ComponentType componentType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1591,4 +1628,42 @@ public class FunctionsImplBase {
         return SINK;
     }
 
+    protected void componentStatusRequestValidate (final String tenant, final 
String namespace, final String componentName) {
+        if (!isWorkerServiceAvailable()) {
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} Status request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", 
componentType, componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+    }
+
+    protected void componentInstanceStatusRequestValidate (final String 
tenant, final String namespace,
+                                                           final String 
componentName, int instanceId) {
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        int parallelism = 
functionMetaData.getFunctionDetails().getParallelism();
+        if (instanceId < 0 || instanceId >= parallelism) {
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", 
componentType, tenant, namespace, componentName);
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s doesn't have instance with id %s", componentType, 
componentName, instanceId));
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index eb0e696..355cf11 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,14 +19,224 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
 @Slf4j
-public class FunctionsImpl extends FunctionsImplBase {
+public class FunctionsImpl extends ComponentImpl {
+
+    private class GetFunctionStatus extends GetStatus<FunctionStatus, 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
notScheduledInstance() {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(false);
+            functionInstanceStatusData.setError("Function has not been 
scheduled");
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(status.getRunning());
+            functionInstanceStatusData.setError(status.getFailureException());
+            functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            functionInstanceStatusData.setNumReceived(status.getNumReceived());
+            
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
+            
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
+
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
+            
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
+
+            
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
+            
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                    = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+            functionInstanceStatusData.setRunning(false);
+            if (error != null) {
+                functionInstanceStatusData.setError(error);
+            }
+            functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return functionInstanceStatusData;
+        }
+
+        @Override
+        public FunctionStatus getStatus(String tenant, String namespace, 
String name, Collection<Function.Assignment>
+                assignments, URI uri) throws PulsarAdminException {
+            FunctionStatus functionStatus = new FunctionStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
+                if (isOwner) {
+                    functionInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, assignment
+                            .getInstance().getInstanceId(), null);
+                } else {
+                    functionInstanceStatusData = 
worker().getFunctionAdmin().functions().getFunctionStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                FunctionStatus.FunctionInstanceStatus instanceStatus = new 
FunctionStatus.FunctionInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(functionInstanceStatusData);
+                functionStatus.addInstance(instanceStatus);
+            }
+
+            functionStatus.setNumInstances(functionStatus.instances.size());
+            functionStatus.getInstances().forEach(functionInstanceStatus -> {
+                if (functionInstanceStatus.getStatus().isRunning()) {
+                    functionStatus.numRunning++;
+                }
+            });
+            return functionStatus;
+        }
+
+        @Override
+        public FunctionStatus getStatusExternal(String tenant, String 
namespace, String name, int parallelism) {
+            FunctionStatus functionStatus = new FunctionStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                FunctionStatus.FunctionInstanceStatus functionInstanceStatus
+                        = new FunctionStatus.FunctionInstanceStatus();
+                functionInstanceStatus.setInstanceId(i);
+                functionInstanceStatus.setStatus(functionInstanceStatusData);
+                functionStatus.addInstance(functionInstanceStatus);
+            }
+
+            functionStatus.setNumInstances(functionStatus.instances.size());
+            functionStatus.getInstances().forEach(functionInstanceStatus -> {
+                if (functionInstanceStatus.getStatus().isRunning()) {
+                    functionStatus.numRunning++;
+                }
+            });
+            return functionStatus;
+        }
+
+        @Override
+        public FunctionStatus emptyStatus(int parallelism) {
+            FunctionStatus functionStatus = new FunctionStatus();
+            functionStatus.setNumInstances(parallelism);
+            functionStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                FunctionStatus.FunctionInstanceStatus functionInstanceStatus = 
new FunctionStatus.FunctionInstanceStatus();
+                functionInstanceStatus.setInstanceId(i);
+                
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
+                        = new 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+                functionInstanceStatusData.setRunning(false);
+                functionInstanceStatusData.setError("Function has not been 
scheduled");
+                functionInstanceStatus.setStatus(functionInstanceStatusData);
+
+                functionStatus.addInstance(functionInstanceStatus);
+            }
+
+            return functionStatus;
+        }
+    }
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.FUNCTION);
     }
+
+    /**
+     * Get status of a function instance.  If this worker is not running the 
function instance,
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param componentName the function name
+     * @param instanceId the function instance id
+     * @return the function status
+     */
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
getFunctionInstanceStatus(final String tenant, final String namespace, final 
String componentName,
+                                                                               
                       final String instanceId, URI uri) throws IOException {
+
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, 
componentName, Integer.parseInt(instanceId));
+
+        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData;
+        try {
+            functionInstanceStatusData = new 
GetFunctionStatus().getComponentInstanceStatus(tenant, namespace, componentName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return functionInstanceStatusData;
+    }
+
+    /**
+     * Get statuses of all function instances.
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param componentName the function name
+     * @return a list of function statuses
+     * @throws PulsarAdminException
+     */
+    public FunctionStatus getFunctionStatus(final String tenant, final String 
namespace, final String componentName,
+                                            URI uri) {
+
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        FunctionStatus functionStatus;
+        try {
+            functionStatus = new 
GetFunctionStatus().getComponentStatus(tenant, namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return functionStatus;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index b7202f5..4ecba12 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -18,63 +18,205 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
-public class SinkImpl extends FunctionsImplBase {
-    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SINK);
-    }
+@Slf4j
+public class SinkImpl extends ComponentImpl {
 
+    private class GetSinkStatus extends GetStatus<SinkStatus, 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
-    public SinkStatus getSinkStatus(final String tenant, final String 
namespace,
-                                        final String componentName, URI uri) 
throws IOException {
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
notScheduledInstance() {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(false);
+            sinkInstanceStatusData.setError("Sink has not been scheduled");
+            return sinkInstanceStatusData;
+        }
 
-        FunctionStatus functionStatus = getFunctionStatus(tenant, namespace, 
componentName, uri);
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(status.getRunning());
+            sinkInstanceStatusData.setError(status.getFailureException());
+            sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            sinkInstanceStatusData.setNumReceived(status.getNumReceived());
 
-        SinkStatus sinkStatus = new SinkStatus();
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
 
-        sinkStatus.setNumInstances(functionStatus.getNumInstances());
-        sinkStatus.setNumRunning(functionStatus.getNumRunning());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            SinkStatus.SinkInstanceStatus sinkInstanceStatus = new 
SinkStatus.SinkInstanceStatus();
-            
sinkInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-            
sinkInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
-            sinkStatus.addInstance(sinkInstanceStatus);
-        });
-        return sinkStatus;
+            
sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sinkInstanceStatusData;
+        }
+
+        @Override
+        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                    = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+            sinkInstanceStatusData.setRunning(false);
+            if (error != null) {
+                sinkInstanceStatusData.setError(error);
+            }
+            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sinkInstanceStatusData;
+        }
+
+        @Override
+        public SinkStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+            SinkStatus sinkStatus = new SinkStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData;
+                if (isOwner) {
+                    sinkInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, 
assignment.getInstance().getInstanceId(), null);
+                } else {
+                    sinkInstanceStatusData = 
worker().getFunctionAdmin().sink().getSinkStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                SinkStatus.SinkInstanceStatus instanceStatus = new 
SinkStatus.SinkInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(sinkInstanceStatusData);
+                sinkStatus.addInstance(instanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+                if (sinkInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SinkStatus getStatusExternal (String tenant, String namespace, 
String name, int parallelism) {
+            SinkStatus sinkStatus = new SinkStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                SinkStatus.SinkInstanceStatus sinkInstanceStatus
+                        = new SinkStatus.SinkInstanceStatus();
+                sinkInstanceStatus.setInstanceId(i);
+                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+                sinkStatus.addInstance(sinkInstanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+                if (sinkInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SinkStatus emptyStatus(int parallelism) {
+            SinkStatus sinkStatus = new SinkStatus();
+            sinkStatus.setNumInstances(parallelism);
+            sinkStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new 
SinkStatus.SinkInstanceStatus();
+                sinkInstanceStatus.setInstanceId(i);
+                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
+                        = new 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+                sinkInstanceStatusData.setRunning(false);
+                sinkInstanceStatusData.setError("Sink has not been scheduled");
+                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+
+                sinkStatus.addInstance(sinkInstanceStatus);
+            }
+
+            return sinkStatus;
+        }
     }
 
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(String tenant,
-                                                                               
               String namespace,
-                                                                               
               String sinkName,
-                                                                               
               String instanceId,
-                                                                               
               URI requestUri)
+    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, ComponentType.SINK);
+    }
+
+    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(
+            String tenant, String namespace, String sinkName, String 
instanceId, URI uri)
             throws IOException {
 
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                = getFunctionInstanceStatus(tenant, namespace, sinkName, 
instanceId, requestUri);
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
 
-        return fromFunctionInstanceStatus(functionInstanceStatusData);
-    }
 
-    private static SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
fromFunctionInstanceStatus(
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData) {
-        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData
-                = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
-
-        sinkInstanceStatusData.setError(functionInstanceStatusData.getError());
-        
sinkInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-        
sinkInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-        
sinkInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-        
sinkInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-        
sinkInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-        
sinkInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
sinkInstanceStatusData;
+        try {
+            sinkInstanceStatusData = new 
GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
         return sinkInstanceStatusData;
     }
+
+    public SinkStatus getSinkStatus(
+            final String tenant, final String namespace,
+            final String componentName, URI uri) {
+
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
+
+        SinkStatus sinkStatus;
+        try {
+            sinkStatus = new GetSinkStatus().getComponentStatus(tenant, 
namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        return sinkStatus;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f69b03e..412019a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -18,62 +18,199 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Supplier;
 
-public class SourceImpl extends FunctionsImplBase {
+@Slf4j
+public class SourceImpl extends ComponentImpl {
+    private class GetSourceStatus extends GetStatus<SourceStatus, 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
notScheduledInstance() {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(false);
+            sourceInstanceStatusData.setError("Source has not been scheduled");
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
fromFunctionStatusProto(
+                InstanceCommunication.FunctionStatus status,
+                String assignedWorkerId) {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(status.getRunning());
+            sourceInstanceStatusData.setError(status.getFailureException());
+            sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
+            sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+
+            List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                userExceptionInformationList.add(exceptionInformation);
+            }
+
+            
sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new 
LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestSystemExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+            
sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
notRunning(String assignedWorkerId, String error) {
+            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                    = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+            sourceInstanceStatusData.setRunning(false);
+            if (error != null) {
+                sourceInstanceStatusData.setError(error);
+            }
+            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+            return sourceInstanceStatusData;
+        }
+
+        @Override
+        public SourceStatus getStatus(String tenant, String namespace, String 
name, Collection<Function.Assignment> assignments, URI uri) throws 
PulsarAdminException {
+            SourceStatus sourceStatus = new SourceStatus();
+            for (Function.Assignment assignment : assignments) {
+                boolean isOwner = 
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData;
+                if (isOwner) {
+                    sourceInstanceStatusData = 
getComponentInstanceStatus(tenant, namespace, name, 
assignment.getInstance().getInstanceId(), null);
+                } else {
+                    sourceInstanceStatusData = 
worker().getFunctionAdmin().source().getSourceStatus(
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                SourceStatus.SourceInstanceStatus instanceStatus = new 
SourceStatus.SourceInstanceStatus();
+                
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+                instanceStatus.setStatus(sourceInstanceStatusData);
+                sourceStatus.addInstance(instanceStatus);
+            }
+
+            sourceStatus.setNumInstances(sourceStatus.instances.size());
+            sourceStatus.getInstances().forEach(sourceInstanceStatus -> {
+                if (sourceInstanceStatus.getStatus().isRunning()) {
+                    sourceStatus.numRunning++;
+                }
+            });
+            return sourceStatus;
+        }
+
+        @Override
+        public SourceStatus getStatusExternal(String tenant, String namespace, 
String name, int parallelism) {
+            SourceStatus sinkStatus = new SourceStatus();
+            for (int i = 0; i < parallelism; ++i) {
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                        = getComponentInstanceStatus(tenant, namespace, name, 
i, null);
+                SourceStatus.SourceInstanceStatus sourceInstanceStatus
+                        = new SourceStatus.SourceInstanceStatus();
+                sourceInstanceStatus.setInstanceId(i);
+                sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+                sinkStatus.addInstance(sourceInstanceStatus);
+            }
+
+            sinkStatus.setNumInstances(sinkStatus.instances.size());
+            sinkStatus.getInstances().forEach(sourceInstanceStatus -> {
+                if (sourceInstanceStatus.getStatus().isRunning()) {
+                    sinkStatus.numRunning++;
+                }
+            });
+            return sinkStatus;
+        }
+
+        @Override
+        public SourceStatus emptyStatus(int parallelism) {
+            SourceStatus sourceStatus = new SourceStatus();
+            sourceStatus.setNumInstances(parallelism);
+            sourceStatus.setNumRunning(0);
+            for (int i = 0; i < parallelism; i++) {
+                SourceStatus.SourceInstanceStatus sourceInstanceStatus = new 
SourceStatus.SourceInstanceStatus();
+                sourceInstanceStatus.setInstanceId(i);
+                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
+                        = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+                sourceInstanceStatusData.setRunning(false);
+                sourceInstanceStatusData.setError("Source has not been 
scheduled");
+                sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+
+                sourceStatus.addInstance(sourceInstanceStatus);
+            }
+
+            return sourceStatus;
+        }
+    }
+
     public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
     public SourceStatus getSourceStatus(final String tenant, final String 
namespace,
                                         final String componentName, URI uri) 
throws IOException {
+        // validate parameters
+        componentStatusRequestValidate(tenant, namespace, componentName);
 
-        FunctionStatus functionStatus = getFunctionStatus(tenant, namespace, 
componentName, uri);
-
-        SourceStatus sourceStatus = new SourceStatus();
+        SourceStatus sourceStatus;
+        try {
+            sourceStatus = new GetSourceStatus().getComponentStatus(tenant, 
namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, componentName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
 
-        sourceStatus.setNumInstances(functionStatus.getNumInstances());
-        sourceStatus.setNumRunning(functionStatus.getNumRunning());
-        functionStatus.getInstances().forEach(functionInstanceStatus -> {
-            SourceStatus.SourceInstanceStatus sourceInstanceStatus = new 
SourceStatus.SourceInstanceStatus();
-            
sourceInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-            
sourceInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
-            sourceStatus.addInstance(sourceInstanceStatus);
-        });
         return sourceStatus;
     }
 
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(String tenant,
-                                                                               
               String namespace,
-                                                                               
               String sourceName,
-                                                                               
               String instanceId,
-                                                                               
               URI requestUri)
-            throws IOException {
-
-        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData
-                = getFunctionInstanceStatus(tenant, namespace, sourceName, 
instanceId, requestUri);
-
-        return fromFunctionInstanceStatus(functionInstanceStatusData);
-    }
+    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(
+            String tenant, String namespace, String sourceName, String 
instanceId, URI uri) {
+        // validate parameters
+        componentInstanceStatusRequestValidate(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
 
-    private static SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
fromFunctionInstanceStatus(
-            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData 
functionInstanceStatusData) {
-        SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData
-                = new 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
-
-        
sourceInstanceStatusData.setError(functionInstanceStatusData.getError());
-        
sourceInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-        
sourceInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-        
sourceInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-        
sourceInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-        
sourceInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-        
sourceInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+        SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
sourceInstanceStatusData;
+        try {
+            sourceInstanceStatusData = new 
GetSourceStatus().getComponentInstanceStatus(tenant, namespace, sourceName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, 
namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
         return sourceInstanceStatusData;
     }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index bf3766c..98747ed 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -70,7 +70,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -169,7 +169,7 @@ public class FunctionApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1220,9 +1220,9 @@ public class FunctionApiV2ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultFunctions();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 234ca2f..a0e6843 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -43,8 +43,7 @@ import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -174,7 +173,7 @@ public class SinkApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1183,9 +1182,9 @@ public class SinkApiV2ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultSinks();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 8f72123..fcba85c 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -41,8 +41,7 @@ import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -159,7 +158,7 @@ public class SourceApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-        
Mockito.doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+        
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -1186,9 +1185,9 @@ public class SourceApiV2ResourceTest {
                 
FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), 
eq(namespace))).thenReturn(functionMetaDataList);
-        
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultSources();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7ede4c..5add9a1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -255,6 +255,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
 
                 SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
                 try {
@@ -481,6 +482,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SourceStatus sourceStatus = 
SourceStatus.decode(result.getStdout());
                 try {
                     assertEquals(sourceStatus.getNumInstances(), 1);
@@ -532,6 +535,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SourceStatus sourceStatus = 
SourceStatus.decode(result.getStdout());
                 try {
                     assertEquals(sourceStatus.getNumInstances(), 1);
@@ -574,6 +579,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
 
+                assertEquals(result.getExitCode(), 0);
+
                 SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
                 try {
                     assertEquals(sinkStatus.getNumInstances(), 1);

Reply via email to