This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4794bd fixing and refactoring function status (#3102)
d4794bd is described below
commit d4794bd5e100a946780ff562bf45ef44a7dc5281
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sun Dec 2 20:06:26 2018 -0800
fixing and refactoring function status (#3102)
* fixing and refactoring function status
* further refactoring
* cleaning up
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 5 +-
.../functions/worker/FunctionRuntimeManager.java | 261 +++----------------
.../{FunctionsImplBase.java => ComponentImpl.java} | 279 +++++++++++++--------
.../functions/worker/rest/api/FunctionsImpl.java | 212 +++++++++++++++-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 220 +++++++++++++---
.../functions/worker/rest/api/SourceImpl.java | 211 +++++++++++++---
.../rest/api/v2/FunctionApiV2ResourceTest.java | 10 +-
.../worker/rest/api/v2/SinkApiV2ResourceTest.java | 11 +-
.../rest/api/v2/SourceApiV2ResourceTest.java | 11 +-
.../integration/functions/PulsarFunctionsTest.java | 7 +
10 files changed, 796 insertions(+), 431 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index fa35ca1..90766d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -669,9 +669,8 @@ public class PulsarFunctionE2ETest {
}
}, 5, 200);
- FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
- FunctionStatus functionStatus =
functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
- functionName, null);
+ FunctionStatus functionStatus =
admin.functions().getFunctionStatus(tenant, namespacePortion,
+ functionName);
int numInstances = functionStatus.getNumInstances();
assertEquals(numInstances, 1);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f9e0c75..152c1db 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,26 +18,10 @@
*/
package org.apache.pulsar.functions.worker;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -46,24 +30,35 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Reflections;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
/**
* This class managers all aspects of functions assignments and running of
function assignments for this worker
*/
@@ -591,207 +586,6 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
/**
- * Get status of a function instance. If this worker is not running the
function instance,
- * @param tenant the tenant the function belongs to
- * @param namespace the namespace the function belongs to
- * @param functionName the function name
- * @param instanceId the function instance id
- * @return the function status
- */
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(
- String tenant, String namespace,
- String functionName, int instanceId, URI uri) {
- Assignment assignment;
- if (runtimeFactory.externallyManaged()) {
- assignment = this.findAssignment(tenant, namespace, functionName,
-1);
- } else {
- assignment = this.findAssignment(tenant, namespace, functionName,
instanceId);
- }
-
- if (assignment == null) {
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- functionInstanceStatusData.setRunning(false);
- functionInstanceStatusData.setError("Function has not been
scheduled");
- return functionInstanceStatusData;
- }
-
- final String assignedWorkerId = assignment.getWorkerId();
- final String workerId = this.workerConfig.getWorkerId();
-
- // If I am running worker
- if (assignedWorkerId.equals(workerId)) {
- FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(
-
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
- RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- if (runtimeSpawner != null) {
- try {
- InstanceCommunication.FunctionStatus status =
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
- functionInstanceStatusData.setRunning(status.getRunning());
-
functionInstanceStatusData.setError(status.getFailureException());
-
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
-
functionInstanceStatusData.setNumReceived(status.getNumReceived());
-
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
-
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
-
- List<ExceptionInformation> userExceptionInformationList =
new LinkedList<>();
- for
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry :
status.getLatestUserExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
-
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
- userExceptionInformationList.add(exceptionInformation);
- }
-
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
-
-
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
- List<ExceptionInformation> systemExceptionInformationList
= new LinkedList<>();
- for
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry :
status.getLatestSystemExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
-
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-
systemExceptionInformationList.add(exceptionInformation);
- }
-
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
-
-
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
-
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
- functionInstanceStatusData.setWorkerId(assignedWorkerId);
-
-
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- } else {
- functionInstanceStatusData.setRunning(false);
- if (functionRuntimeInfo.getStartupException() != null) {
-
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
- }
- functionInstanceStatusData.setWorkerId(assignedWorkerId);
- }
- return functionInstanceStatusData;
- } else {
- // query other worker
-
- List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
- WorkerInfo workerInfo = null;
- for (WorkerInfo entry: workerInfoList) {
- if (assignment.getWorkerId().equals(entry.getWorkerId())) {
- workerInfo = entry;
- }
- }
- if (workerInfo == null) {
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- functionInstanceStatusData.setRunning(false);
- functionInstanceStatusData.setError("Function has not been
scheduled");
- return functionInstanceStatusData;
- }
-
- if (uri == null) {
- throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
- } else {
- URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- }
-
- /**
- * Get statuses of all function instances.
- * @param tenant the tenant the function belongs to
- * @param namespace the namespace the function belongs to
- * @param functionName the function name
- * @return a list of function statuses
- * @throws PulsarAdminException
- */
- public FunctionStatus getFunctionStatus(String tenant, String namespace,
- String functionName, URI uri)
- throws PulsarAdminException {
-
- Collection<Assignment> assignments =
this.findFunctionAssignments(tenant, namespace, functionName);
-
- FunctionStatus functionStatus = new FunctionStatus();
- if (assignments.isEmpty()) {
- Function.FunctionMetaData functionMetaData =
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant,
namespace, functionName);
-
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
- functionStatus.setNumRunning(0);
-
- return functionStatus;
- }
-
- // TODO refactor the code for externally managed.
- if (runtimeFactory.externallyManaged()) {
- Assignment assignment = assignments.iterator().next();
- boolean isOwner =
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
- if (isOwner) {
- int parallelism =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
- for (int i = 0; i < parallelism; ++i) {
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace,
functionName, i, null);
- FunctionStatus.FunctionInstanceStatus
functionInstanceStatus
- = new FunctionStatus.FunctionInstanceStatus();
- functionInstanceStatus.setInstanceId(i);
-
functionInstanceStatus.setStatus(functionInstanceStatusData);
- functionStatus.addInstance(functionInstanceStatus);
- }
- } else {
- // find the hostname/port of the worker who is the owner
-
- List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
- WorkerInfo workerInfo = null;
- for (WorkerInfo entry: workerInfoList) {
- if (assignment.getWorkerId().equals(entry.getWorkerId())) {
- workerInfo = entry;
- }
- }
- if (workerInfo == null) {
- Function.FunctionMetaData functionMetaData =
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant,
namespace, functionName);
-
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
- functionStatus.setNumRunning(0);
- return functionStatus;
- }
-
- if (uri == null) {
- throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
- } else {
- URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- } else {
- for (Assignment assignment : assignments) {
- boolean isOwner =
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
- if (isOwner) {
- functionInstanceStatusData =
getFunctionInstanceStatus(tenant, namespace, functionName,
assignment.getInstance().getInstanceId(), null);
- } else {
- functionInstanceStatusData =
this.functionAdmin.functions().getFunctionStatus(
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
- assignment.getInstance().getInstanceId());
- }
-
- FunctionStatus.FunctionInstanceStatus instanceStatus = new
FunctionStatus.FunctionInstanceStatus();
-
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
- instanceStatus.setStatus(functionInstanceStatusData);
- functionStatus.addInstance(instanceStatus);
- }
- }
- functionStatus.setNumInstances(functionStatus.instances.size());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- if (functionInstanceStatus.getStatus().isRunning()) {
- functionStatus.numRunning++;
- }
- });
- return functionStatus;
- }
-
- /**
* Process an assignment update from the assignment topic
* @param newAssignment the assignment
*/
@@ -990,8 +784,11 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
}
- private FunctionRuntimeInfo getFunctionRuntimeInfo(String
fullyQualifiedInstanceId) {
- return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+ public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String
fullyQualifiedInstanceId) {
+ return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
}
+ private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String
fullyQualifiedInstanceId) {
+ return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
similarity index 90%
rename from
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
rename to
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index bab81e4..8f13dc8 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -54,6 +54,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -71,9 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.pulsar.functions.utils.Utils.*;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.FUNCTION;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SINK;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SOURCE;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,26 +85,30 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -116,7 +121,7 @@ import
org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import net.jodah.typetools.TypeResolver;
@Slf4j
-public class FunctionsImplBase {
+public abstract class ComponentImpl {
public enum ComponentType {
FUNCTION("Function"),
@@ -136,15 +141,137 @@ public class FunctionsImplBase {
}
private final AtomicReference<StorageClient> storageClient = new
AtomicReference<>();
- private final Supplier<WorkerService> workerServiceSupplier;
- private final ComponentType componentType;
+ protected final Supplier<WorkerService> workerServiceSupplier;
+ protected final ComponentType componentType;
- public FunctionsImplBase(Supplier<WorkerService> workerServiceSupplier,
ComponentType componentType) {
+ public ComponentImpl(Supplier<WorkerService> workerServiceSupplier,
ComponentType componentType) {
this.workerServiceSupplier = workerServiceSupplier;
this.componentType = componentType;
}
- private WorkerService worker() {
+ protected abstract class GetStatus<S, T> {
+
+ public abstract T notScheduledInstance();
+
+ public abstract T
fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String
assignedWorkerId);
+
+ public abstract T notRunning(String assignedWorkerId, String error);
+
+ public T getComponentInstanceStatus(String tenant, String namespace,
+ String name, int instanceId, URI
uri) {
+
+ Function.Assignment assignment;
+ if
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+ assignment =
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace,
name, -1);
+ } else {
+ assignment =
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace,
name, instanceId);
+ }
+
+ if (assignment == null) {
+ return notScheduledInstance();
+ }
+
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = worker().getWorkerConfig().getWorkerId();
+
+ // If I am running worker
+ if (assignedWorkerId.equals(workerId)) {
+ FunctionRuntimeInfo functionRuntimeInfo =
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
+
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+ if (functionRuntimeInfo == null) {
+ log.error("{} in get {} Status does not exist @
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
+ throw new RestException(Status.NOT_FOUND,
String.format("%s %s doesn't exist", componentType, name));
+ }
+ RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
+
+ if (runtimeSpawner != null) {
+ try {
+ return fromFunctionStatusProto(
+
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(),
+ assignedWorkerId);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return notRunning(assignedWorkerId,
functionRuntimeInfo.getStartupException().getMessage());
+ }
+ } else {
+ // query other worker
+
+ List<WorkerInfo> workerInfoList =
worker().getMembershipManager().getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry : workerInfoList) {
+ if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ return notScheduledInstance();
+ }
+
+ if (uri == null) {
+ throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ }
+
+ public abstract S getStatus(String tenant, String namespace,
+ String name,
Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException;
+
+ public abstract S getStatusExternal(String tenant, String namespace,
+ String name, int parallelism);
+
+ public abstract S emptyStatus(int parallelism);
+
+ public S getComponentStatus(String tenant, String namespace,
+ String name, URI uri) {
+
+ Function.FunctionMetaData functionMetaData =
worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace,
name);
+
+ Collection<Function.Assignment> assignments =
worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace,
name);
+
+ // TODO refactor the code for externally managed.
+ if
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+ Function.Assignment assignment = assignments.iterator().next();
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ if (isOwner) {
+ return getStatusExternal(tenant, namespace, name,
functionMetaData.getFunctionDetails().getParallelism());
+ } else {
+
+ // find the hostname/port of the worker who is the owner
+
+ List<WorkerInfo> workerInfoList =
worker().getMembershipManager().getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry: workerInfoList) {
+ if
(assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ return
emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
+ }
+
+ if (uri == null) {
+ throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ } else {
+ try {
+ return getStatus(tenant, namespace, name, assignments,
uri);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ protected WorkerService worker() {
try {
return checkNotNull(workerServiceSupplier.get());
} catch (Throwable t) {
@@ -153,7 +280,7 @@ public class FunctionsImplBase {
}
}
- private boolean isWorkerServiceAvailable() {
+ boolean isWorkerServiceAvailable() {
WorkerService workerService = workerServiceSupplier.get();
if (workerService == null) {
return false;
@@ -719,96 +846,6 @@ public class FunctionsImplBase {
}
}
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(final String tenant, final String namespace, final
String componentName,
-
final String instanceId, URI uri) throws IOException {
-
- if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
- }
-
- // validate parameters
- try {
- validateGetFunctionInstanceRequestParams(tenant, namespace,
componentName, componentType, instanceId);
- } catch (IllegalArgumentException e) {
- log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
-
- }
- FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
- if (!calculateSubjectType(functionMetaData).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
- int instanceIdInt = Integer.parseInt(instanceId);
- if (instanceIdInt < 0 || instanceIdInt >=
functionMetaData.getFunctionDetails().getParallelism()) {
- log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}",
componentType, tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s
doesn't have instance with id %s", componentType, componentName, instanceId));
- }
-
- FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
-
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
- try {
- functionInstanceStatusData =
functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace,
componentName,
- Integer.parseInt(instanceId), uri);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- return functionInstanceStatusData;
- }
-
- public FunctionStatus getFunctionStatus(final String tenant, final String
namespace, final String componentName,
- URI uri)
- throws IOException {
-
- if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
- }
-
- // validate parameters
- try {
- validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
- } catch (IllegalArgumentException e) {
- log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
-
- FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
- if (!calculateSubjectType(functionMetaData).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
-
- FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
- FunctionStatus functionStatus;
- try {
- functionStatus = functionRuntimeManager.getFunctionStatus(tenant,
namespace, componentName, uri);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- return functionStatus;
- }
-
public FunctionStats getFunctionStats(final String tenant, final String
namespace, final String componentName,
URI uri) throws IOException {
if (!isWorkerServiceAvailable()) {
@@ -1176,7 +1213,7 @@ public class FunctionsImplBase {
}
}
- private void validateGetFunctionInstanceRequestParams(String tenant,
String namespace, String componentName,
+ protected void validateGetFunctionInstanceRequestParams(String tenant,
String namespace, String componentName,
ComponentType
componentType, String instanceId) throws IllegalArgumentException {
validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
if (instanceId == null) {
@@ -1184,7 +1221,7 @@ public class FunctionsImplBase {
}
}
- private void validateGetFunctionRequestParams(String tenant, String
namespace, String subject, ComponentType componentType)
+ protected void validateGetFunctionRequestParams(String tenant, String
namespace, String subject, ComponentType componentType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -1591,4 +1628,42 @@ public class FunctionsImplBase {
return SINK;
}
+ protected void componentStatusRequestValidate (final String tenant, final
String namespace, final String componentName) {
+ if (!isWorkerServiceAvailable()) {
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
+ throw new RestException(Status.BAD_REQUEST, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
+ log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+ }
+
+ protected void componentInstanceStatusRequestValidate (final String
tenant, final String namespace,
+ final String
componentName, int instanceId) {
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ int parallelism =
functionMetaData.getFunctionDetails().getParallelism();
+ if (instanceId < 0 || instanceId >= parallelism) {
+ log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}",
componentType, tenant, namespace, componentName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't have instance with id %s", componentType,
componentName, instanceId));
+ }
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index eb0e696..355cf11 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,14 +19,224 @@
package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
@Slf4j
-public class FunctionsImpl extends FunctionsImplBase {
+public class FunctionsImpl extends ComponentImpl {
+
+ private class GetFunctionStatus extends GetStatus<FunctionStatus,
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
notScheduledInstance() {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ functionInstanceStatusData.setError("Function has not been
scheduled");
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(status.getRunning());
+ functionInstanceStatusData.setError(status.getFailureException());
+ functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ functionInstanceStatusData.setNumReceived(status.getNumReceived());
+
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
+
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
+
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
+
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
+
+
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
+
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ if (error != null) {
+ functionInstanceStatusData.setError(error);
+ }
+ functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public FunctionStatus getStatus(String tenant, String namespace,
String name, Collection<Function.Assignment>
+ assignments, URI uri) throws PulsarAdminException {
+ FunctionStatus functionStatus = new FunctionStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
+ if (isOwner) {
+ functionInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name, assignment
+ .getInstance().getInstanceId(), null);
+ } else {
+ functionInstanceStatusData =
worker().getFunctionAdmin().functions().getFunctionStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ FunctionStatus.FunctionInstanceStatus instanceStatus = new
FunctionStatus.FunctionInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(functionInstanceStatusData);
+ functionStatus.addInstance(instanceStatus);
+ }
+
+ functionStatus.setNumInstances(functionStatus.instances.size());
+ functionStatus.getInstances().forEach(functionInstanceStatus -> {
+ if (functionInstanceStatus.getStatus().isRunning()) {
+ functionStatus.numRunning++;
+ }
+ });
+ return functionStatus;
+ }
+
+ @Override
+ public FunctionStatus getStatusExternal(String tenant, String
namespace, String name, int parallelism) {
+ FunctionStatus functionStatus = new FunctionStatus();
+ for (int i = 0; i < parallelism; ++i) {
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ FunctionStatus.FunctionInstanceStatus functionInstanceStatus
+ = new FunctionStatus.FunctionInstanceStatus();
+ functionInstanceStatus.setInstanceId(i);
+ functionInstanceStatus.setStatus(functionInstanceStatusData);
+ functionStatus.addInstance(functionInstanceStatus);
+ }
+
+ functionStatus.setNumInstances(functionStatus.instances.size());
+ functionStatus.getInstances().forEach(functionInstanceStatus -> {
+ if (functionInstanceStatus.getStatus().isRunning()) {
+ functionStatus.numRunning++;
+ }
+ });
+ return functionStatus;
+ }
+
+ @Override
+ public FunctionStatus emptyStatus(int parallelism) {
+ FunctionStatus functionStatus = new FunctionStatus();
+ functionStatus.setNumInstances(parallelism);
+ functionStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ FunctionStatus.FunctionInstanceStatus functionInstanceStatus =
new FunctionStatus.FunctionInstanceStatus();
+ functionInstanceStatus.setInstanceId(i);
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ functionInstanceStatusData.setError("Function has not been
scheduled");
+ functionInstanceStatus.setStatus(functionInstanceStatusData);
+
+ functionStatus.addInstance(functionInstanceStatus);
+ }
+
+ return functionStatus;
+ }
+ }
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, ComponentType.FUNCTION);
}
+
+ /**
+ * Get status of a function instance. If this worker is not running the
function instance,
+ * @param tenant the tenant the function belongs to
+ * @param namespace the namespace the function belongs to
+ * @param componentName the function name
+ * @param instanceId the function instance id
+ * @return the function status
+ */
+ public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(final String tenant, final String namespace, final
String componentName,
+
final String instanceId, URI uri) throws IOException {
+
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace,
componentName, Integer.parseInt(instanceId));
+
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
+ try {
+ functionInstanceStatusData = new
GetFunctionStatus().getComponentInstanceStatus(tenant, namespace, componentName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return functionInstanceStatusData;
+ }
+
+ /**
+ * Get statuses of all function instances.
+ * @param tenant the tenant the function belongs to
+ * @param namespace the namespace the function belongs to
+ * @param componentName the function name
+ * @return a list of function statuses
+ * @throws PulsarAdminException
+ */
+ public FunctionStatus getFunctionStatus(final String tenant, final String
namespace, final String componentName,
+ URI uri) {
+
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ FunctionStatus functionStatus;
+ try {
+ functionStatus = new
GetFunctionStatus().getComponentStatus(tenant, namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return functionStatus;
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index b7202f5..4ecba12 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -18,63 +18,205 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
-public class SinkImpl extends FunctionsImplBase {
- public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SINK);
- }
+@Slf4j
+public class SinkImpl extends ComponentImpl {
+ private class GetSinkStatus extends GetStatus<SinkStatus,
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
- public SinkStatus getSinkStatus(final String tenant, final String
namespace,
- final String componentName, URI uri)
throws IOException {
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
notScheduledInstance() {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ sinkInstanceStatusData.setError("Sink has not been scheduled");
+ return sinkInstanceStatusData;
+ }
- FunctionStatus functionStatus = getFunctionStatus(tenant, namespace,
componentName, uri);
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(status.getRunning());
+ sinkInstanceStatusData.setError(status.getFailureException());
+ sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ sinkInstanceStatusData.setNumReceived(status.getNumReceived());
- SinkStatus sinkStatus = new SinkStatus();
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
- sinkStatus.setNumInstances(functionStatus.getNumInstances());
- sinkStatus.setNumRunning(functionStatus.getNumRunning());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- SinkStatus.SinkInstanceStatus sinkInstanceStatus = new
SinkStatus.SinkInstanceStatus();
-
sinkInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-
sinkInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
- sinkStatus.addInstance(sinkInstanceStatus);
- });
- return sinkStatus;
+
sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sinkInstanceStatusData;
+ }
+
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ if (error != null) {
+ sinkInstanceStatusData.setError(error);
+ }
+ sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sinkInstanceStatusData;
+ }
+
+ @Override
+ public SinkStatus getStatus(String tenant, String namespace, String
name, Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException {
+ SinkStatus sinkStatus = new SinkStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData;
+ if (isOwner) {
+ sinkInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name,
assignment.getInstance().getInstanceId(), null);
+ } else {
+ sinkInstanceStatusData =
worker().getFunctionAdmin().sink().getSinkStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ SinkStatus.SinkInstanceStatus instanceStatus = new
SinkStatus.SinkInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(sinkInstanceStatusData);
+ sinkStatus.addInstance(instanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+ if (sinkInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SinkStatus getStatusExternal (String tenant, String namespace,
String name, int parallelism) {
+ SinkStatus sinkStatus = new SinkStatus();
+ for (int i = 0; i < parallelism; ++i) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ SinkStatus.SinkInstanceStatus sinkInstanceStatus
+ = new SinkStatus.SinkInstanceStatus();
+ sinkInstanceStatus.setInstanceId(i);
+ sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+ sinkStatus.addInstance(sinkInstanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+ if (sinkInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SinkStatus emptyStatus(int parallelism) {
+ SinkStatus sinkStatus = new SinkStatus();
+ sinkStatus.setNumInstances(parallelism);
+ sinkStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ SinkStatus.SinkInstanceStatus sinkInstanceStatus = new
SinkStatus.SinkInstanceStatus();
+ sinkInstanceStatus.setInstanceId(i);
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ sinkInstanceStatusData.setError("Sink has not been scheduled");
+ sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+
+ sinkStatus.addInstance(sinkInstanceStatus);
+ }
+
+ return sinkStatus;
+ }
}
- public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(String tenant,
-
String namespace,
-
String sinkName,
-
String instanceId,
-
URI requestUri)
+ public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier, ComponentType.SINK);
+ }
+
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(
+ String tenant, String namespace, String sinkName, String
instanceId, URI uri)
throws IOException {
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace, sinkName,
instanceId, requestUri);
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace, sinkName,
Integer.parseInt(instanceId));
- return fromFunctionInstanceStatus(functionInstanceStatusData);
- }
- private static SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
fromFunctionInstanceStatus(
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData) {
- SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
- = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
-
- sinkInstanceStatusData.setError(functionInstanceStatusData.getError());
-
sinkInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-
sinkInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-
sinkInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-
sinkInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-
sinkInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-
sinkInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData;
+ try {
+ sinkInstanceStatusData = new
GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
return sinkInstanceStatusData;
}
+
+ public SinkStatus getSinkStatus(
+ final String tenant, final String namespace,
+ final String componentName, URI uri) {
+
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ SinkStatus sinkStatus;
+ try {
+ sinkStatus = new GetSinkStatus().getComponentStatus(tenant,
namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return sinkStatus;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f69b03e..412019a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -18,62 +18,199 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
-public class SourceImpl extends FunctionsImplBase {
+@Slf4j
+public class SourceImpl extends ComponentImpl {
+ private class GetSourceStatus extends GetStatus<SourceStatus,
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
notScheduledInstance() {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ sourceInstanceStatusData.setError("Source has not been scheduled");
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(status.getRunning());
+ sourceInstanceStatusData.setError(status.getFailureException());
+ sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
+
+
sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ if (error != null) {
+ sourceInstanceStatusData.setError(error);
+ }
+ sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus getStatus(String tenant, String namespace, String
name, Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException {
+ SourceStatus sourceStatus = new SourceStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData;
+ if (isOwner) {
+ sourceInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name,
assignment.getInstance().getInstanceId(), null);
+ } else {
+ sourceInstanceStatusData =
worker().getFunctionAdmin().source().getSourceStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ SourceStatus.SourceInstanceStatus instanceStatus = new
SourceStatus.SourceInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(sourceInstanceStatusData);
+ sourceStatus.addInstance(instanceStatus);
+ }
+
+ sourceStatus.setNumInstances(sourceStatus.instances.size());
+ sourceStatus.getInstances().forEach(sourceInstanceStatus -> {
+ if (sourceInstanceStatus.getStatus().isRunning()) {
+ sourceStatus.numRunning++;
+ }
+ });
+ return sourceStatus;
+ }
+
+ @Override
+ public SourceStatus getStatusExternal(String tenant, String namespace,
String name, int parallelism) {
+ SourceStatus sinkStatus = new SourceStatus();
+ for (int i = 0; i < parallelism; ++i) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ SourceStatus.SourceInstanceStatus sourceInstanceStatus
+ = new SourceStatus.SourceInstanceStatus();
+ sourceInstanceStatus.setInstanceId(i);
+ sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+ sinkStatus.addInstance(sourceInstanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sourceInstanceStatus -> {
+ if (sourceInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SourceStatus emptyStatus(int parallelism) {
+ SourceStatus sourceStatus = new SourceStatus();
+ sourceStatus.setNumInstances(parallelism);
+ sourceStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ SourceStatus.SourceInstanceStatus sourceInstanceStatus = new
SourceStatus.SourceInstanceStatus();
+ sourceInstanceStatus.setInstanceId(i);
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ sourceInstanceStatusData.setError("Source has not been
scheduled");
+ sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+
+ sourceStatus.addInstance(sourceInstanceStatus);
+ }
+
+ return sourceStatus;
+ }
+ }
+
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant, final String
namespace,
final String componentName, URI uri)
throws IOException {
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
- FunctionStatus functionStatus = getFunctionStatus(tenant, namespace,
componentName, uri);
-
- SourceStatus sourceStatus = new SourceStatus();
+ SourceStatus sourceStatus;
+ try {
+ sourceStatus = new GetSourceStatus().getComponentStatus(tenant,
namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
- sourceStatus.setNumInstances(functionStatus.getNumInstances());
- sourceStatus.setNumRunning(functionStatus.getNumRunning());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- SourceStatus.SourceInstanceStatus sourceInstanceStatus = new
SourceStatus.SourceInstanceStatus();
-
sourceInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-
sourceInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
- sourceStatus.addInstance(sourceInstanceStatus);
- });
return sourceStatus;
}
- public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
getSourceInstanceStatus(String tenant,
-
String namespace,
-
String sourceName,
-
String instanceId,
-
URI requestUri)
- throws IOException {
-
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace, sourceName,
instanceId, requestUri);
-
- return fromFunctionInstanceStatus(functionInstanceStatusData);
- }
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
getSourceInstanceStatus(
+ String tenant, String namespace, String sourceName, String
instanceId, URI uri) {
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace, sourceName,
Integer.parseInt(instanceId));
- private static SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
fromFunctionInstanceStatus(
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData) {
- SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
- = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
-
-
sourceInstanceStatusData.setError(functionInstanceStatusData.getError());
-
sourceInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-
sourceInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-
sourceInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-
sourceInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-
sourceInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-
sourceInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData;
+ try {
+ sourceInstanceStatusData = new
GetSourceStatus().getComponentInstanceStatus(tenant, namespace, sourceName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
return sourceInstanceStatusData;
}
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index bf3766c..98747ed 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -70,7 +70,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -169,7 +169,7 @@ public class FunctionApiV2ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
}
//
@@ -1220,9 +1220,9 @@ public class FunctionApiV2ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultFunctions();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 234ca2f..a0e6843 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -43,8 +43,7 @@ import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
import org.apache.pulsar.io.cassandra.CassandraStringSink;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -174,7 +173,7 @@ public class SinkApiV2ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-
Mockito.doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
}
//
@@ -1183,9 +1182,9 @@ public class SinkApiV2ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultSinks();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 8f72123..fcba85c 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -41,8 +41,7 @@ import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
import org.apache.pulsar.io.twitter.TwitterFireHose;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -159,7 +158,7 @@ public class SourceApiV2ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-
Mockito.doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
}
//
@@ -1186,9 +1185,9 @@ public class SourceApiV2ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultSources();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7ede4c..5add9a1 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -255,6 +255,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
try {
@@ -481,6 +482,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SourceStatus sourceStatus =
SourceStatus.decode(result.getStdout());
try {
assertEquals(sourceStatus.getNumInstances(), 1);
@@ -532,6 +535,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SourceStatus sourceStatus =
SourceStatus.decode(result.getStdout());
try {
assertEquals(sourceStatus.getNumInstances(), 1);
@@ -574,6 +579,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
try {
assertEquals(sinkStatus.getNumInstances(), 1);