jerrypeng closed pull request #3102: fixing and refactoring function status
URL: https://github.com/apache/pulsar/pull/3102
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index fa35ca14f2..90766d364d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -669,9 +669,8 @@ public void testPulsarFunctionStatus() throws Exception {
}
}, 5, 200);
- FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
- FunctionStatus functionStatus =
functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
- functionName, null);
+ FunctionStatus functionStatus =
admin.functions().getFunctionStatus(tenant, namespacePortion,
+ functionName);
int numInstances = functionStatus.getNumInstances();
assertEquals(numInstances, 1);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f9e0c75829..152c1db622 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,26 +18,10 @@
*/
package org.apache.pulsar.functions.worker;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -46,24 +30,35 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Reflections;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
/**
* This class managers all aspects of functions assignments and running of
function assignments for this worker
*/
@@ -590,207 +585,6 @@ public FunctionStats getFunctionStats(String tenant,
String namespace, String fu
return functionStats.calculateOverall();
}
- /**
- * Get status of a function instance. If this worker is not running the
function instance,
- * @param tenant the tenant the function belongs to
- * @param namespace the namespace the function belongs to
- * @param functionName the function name
- * @param instanceId the function instance id
- * @return the function status
- */
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(
- String tenant, String namespace,
- String functionName, int instanceId, URI uri) {
- Assignment assignment;
- if (runtimeFactory.externallyManaged()) {
- assignment = this.findAssignment(tenant, namespace, functionName,
-1);
- } else {
- assignment = this.findAssignment(tenant, namespace, functionName,
instanceId);
- }
-
- if (assignment == null) {
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- functionInstanceStatusData.setRunning(false);
- functionInstanceStatusData.setError("Function has not been
scheduled");
- return functionInstanceStatusData;
- }
-
- final String assignedWorkerId = assignment.getWorkerId();
- final String workerId = this.workerConfig.getWorkerId();
-
- // If I am running worker
- if (assignedWorkerId.equals(workerId)) {
- FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(
-
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
- RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- if (runtimeSpawner != null) {
- try {
- InstanceCommunication.FunctionStatus status =
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
- functionInstanceStatusData.setRunning(status.getRunning());
-
functionInstanceStatusData.setError(status.getFailureException());
-
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
-
functionInstanceStatusData.setNumReceived(status.getNumReceived());
-
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
-
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
-
- List<ExceptionInformation> userExceptionInformationList =
new LinkedList<>();
- for
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry :
status.getLatestUserExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
-
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
- userExceptionInformationList.add(exceptionInformation);
- }
-
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
-
-
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
- List<ExceptionInformation> systemExceptionInformationList
= new LinkedList<>();
- for
(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry :
status.getLatestSystemExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
-
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-
systemExceptionInformationList.add(exceptionInformation);
- }
-
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
-
-
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
-
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
- functionInstanceStatusData.setWorkerId(assignedWorkerId);
-
-
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- } else {
- functionInstanceStatusData.setRunning(false);
- if (functionRuntimeInfo.getStartupException() != null) {
-
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
- }
- functionInstanceStatusData.setWorkerId(assignedWorkerId);
- }
- return functionInstanceStatusData;
- } else {
- // query other worker
-
- List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
- WorkerInfo workerInfo = null;
- for (WorkerInfo entry: workerInfoList) {
- if (assignment.getWorkerId().equals(entry.getWorkerId())) {
- workerInfo = entry;
- }
- }
- if (workerInfo == null) {
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
- functionInstanceStatusData.setRunning(false);
- functionInstanceStatusData.setError("Function has not been
scheduled");
- return functionInstanceStatusData;
- }
-
- if (uri == null) {
- throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
- } else {
- URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- }
-
- /**
- * Get statuses of all function instances.
- * @param tenant the tenant the function belongs to
- * @param namespace the namespace the function belongs to
- * @param functionName the function name
- * @return a list of function statuses
- * @throws PulsarAdminException
- */
- public FunctionStatus getFunctionStatus(String tenant, String namespace,
- String functionName, URI uri)
- throws PulsarAdminException {
-
- Collection<Assignment> assignments =
this.findFunctionAssignments(tenant, namespace, functionName);
-
- FunctionStatus functionStatus = new FunctionStatus();
- if (assignments.isEmpty()) {
- Function.FunctionMetaData functionMetaData =
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant,
namespace, functionName);
-
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
- functionStatus.setNumRunning(0);
-
- return functionStatus;
- }
-
- // TODO refactor the code for externally managed.
- if (runtimeFactory.externallyManaged()) {
- Assignment assignment = assignments.iterator().next();
- boolean isOwner =
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
- if (isOwner) {
- int parallelism =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
- for (int i = 0; i < parallelism; ++i) {
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace,
functionName, i, null);
- FunctionStatus.FunctionInstanceStatus
functionInstanceStatus
- = new FunctionStatus.FunctionInstanceStatus();
- functionInstanceStatus.setInstanceId(i);
-
functionInstanceStatus.setStatus(functionInstanceStatusData);
- functionStatus.addInstance(functionInstanceStatus);
- }
- } else {
- // find the hostname/port of the worker who is the owner
-
- List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
- WorkerInfo workerInfo = null;
- for (WorkerInfo entry: workerInfoList) {
- if (assignment.getWorkerId().equals(entry.getWorkerId())) {
- workerInfo = entry;
- }
- }
- if (workerInfo == null) {
- Function.FunctionMetaData functionMetaData =
workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant,
namespace, functionName);
-
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
- functionStatus.setNumRunning(0);
- return functionStatus;
- }
-
- if (uri == null) {
- throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
- } else {
- URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- } else {
- for (Assignment assignment : assignments) {
- boolean isOwner =
this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
- if (isOwner) {
- functionInstanceStatusData =
getFunctionInstanceStatus(tenant, namespace, functionName,
assignment.getInstance().getInstanceId(), null);
- } else {
- functionInstanceStatusData =
this.functionAdmin.functions().getFunctionStatus(
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
- assignment.getInstance().getInstanceId());
- }
-
- FunctionStatus.FunctionInstanceStatus instanceStatus = new
FunctionStatus.FunctionInstanceStatus();
-
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
- instanceStatus.setStatus(functionInstanceStatusData);
- functionStatus.addInstance(instanceStatus);
- }
- }
- functionStatus.setNumInstances(functionStatus.instances.size());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- if (functionInstanceStatus.getStatus().isRunning()) {
- functionStatus.numRunning++;
- }
- });
- return functionStatus;
- }
-
/**
* Process an assignment update from the assignment topic
* @param newAssignment the assignment
@@ -990,8 +784,11 @@ public void close() throws Exception {
}
}
- private FunctionRuntimeInfo getFunctionRuntimeInfo(String
fullyQualifiedInstanceId) {
- return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+ public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String
fullyQualifiedInstanceId) {
+ return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
}
+ private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String
fullyQualifiedInstanceId) {
+ return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
similarity index 90%
rename from
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
rename to
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index bab81e4026..8f13dc8067 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplBase.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -54,6 +54,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -71,9 +72,9 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.pulsar.functions.utils.Utils.*;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.FUNCTION;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SINK;
-import static
org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase.ComponentType.SOURCE;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK;
+import static
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,26 +85,30 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -116,7 +121,7 @@
import net.jodah.typetools.TypeResolver;
@Slf4j
-public class FunctionsImplBase {
+public abstract class ComponentImpl {
public enum ComponentType {
FUNCTION("Function"),
@@ -136,15 +141,137 @@ public String toString() {
}
private final AtomicReference<StorageClient> storageClient = new
AtomicReference<>();
- private final Supplier<WorkerService> workerServiceSupplier;
- private final ComponentType componentType;
+ protected final Supplier<WorkerService> workerServiceSupplier;
+ protected final ComponentType componentType;
- public FunctionsImplBase(Supplier<WorkerService> workerServiceSupplier,
ComponentType componentType) {
+ public ComponentImpl(Supplier<WorkerService> workerServiceSupplier,
ComponentType componentType) {
this.workerServiceSupplier = workerServiceSupplier;
this.componentType = componentType;
}
- private WorkerService worker() {
+ protected abstract class GetStatus<S, T> {
+
+ public abstract T notScheduledInstance();
+
+ public abstract T
fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String
assignedWorkerId);
+
+ public abstract T notRunning(String assignedWorkerId, String error);
+
+ public T getComponentInstanceStatus(String tenant, String namespace,
+ String name, int instanceId, URI
uri) {
+
+ Function.Assignment assignment;
+ if
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+ assignment =
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace,
name, -1);
+ } else {
+ assignment =
worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace,
name, instanceId);
+ }
+
+ if (assignment == null) {
+ return notScheduledInstance();
+ }
+
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = worker().getWorkerConfig().getWorkerId();
+
+ // If I am running worker
+ if (assignedWorkerId.equals(workerId)) {
+ FunctionRuntimeInfo functionRuntimeInfo =
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
+
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+ if (functionRuntimeInfo == null) {
+ log.error("{} in get {} Status does not exist @
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
+ throw new RestException(Status.NOT_FOUND,
String.format("%s %s doesn't exist", componentType, name));
+ }
+ RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
+
+ if (runtimeSpawner != null) {
+ try {
+ return fromFunctionStatusProto(
+
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(),
+ assignedWorkerId);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return notRunning(assignedWorkerId,
functionRuntimeInfo.getStartupException().getMessage());
+ }
+ } else {
+ // query other worker
+
+ List<WorkerInfo> workerInfoList =
worker().getMembershipManager().getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry : workerInfoList) {
+ if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ return notScheduledInstance();
+ }
+
+ if (uri == null) {
+ throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ }
+
+ public abstract S getStatus(String tenant, String namespace,
+ String name,
Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException;
+
+ public abstract S getStatusExternal(String tenant, String namespace,
+ String name, int parallelism);
+
+ public abstract S emptyStatus(int parallelism);
+
+ public S getComponentStatus(String tenant, String namespace,
+ String name, URI uri) {
+
+ Function.FunctionMetaData functionMetaData =
worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace,
name);
+
+ Collection<Function.Assignment> assignments =
worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace,
name);
+
+ // TODO refactor the code for externally managed.
+ if
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+ Function.Assignment assignment = assignments.iterator().next();
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ if (isOwner) {
+ return getStatusExternal(tenant, namespace, name,
functionMetaData.getFunctionDetails().getParallelism());
+ } else {
+
+ // find the hostname/port of the worker who is the owner
+
+ List<WorkerInfo> workerInfoList =
worker().getMembershipManager().getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry: workerInfoList) {
+ if
(assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ return
emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
+ }
+
+ if (uri == null) {
+ throw new
WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ } else {
+ try {
+ return getStatus(tenant, namespace, name, assignments,
uri);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ protected WorkerService worker() {
try {
return checkNotNull(workerServiceSupplier.get());
} catch (Throwable t) {
@@ -153,7 +280,7 @@ private WorkerService worker() {
}
}
- private boolean isWorkerServiceAvailable() {
+ boolean isWorkerServiceAvailable() {
WorkerService workerService = workerServiceSupplier.get();
if (workerService == null) {
return false;
@@ -719,96 +846,6 @@ public Response stopFunctionInstances(final String tenant,
final String namespac
}
}
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(final String tenant, final String namespace, final
String componentName,
-
final String instanceId, URI uri) throws IOException {
-
- if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
- }
-
- // validate parameters
- try {
- validateGetFunctionInstanceRequestParams(tenant, namespace,
componentName, componentType, instanceId);
- } catch (IllegalArgumentException e) {
- log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
-
- }
- FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
- if (!calculateSubjectType(functionMetaData).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
- int instanceIdInt = Integer.parseInt(instanceId);
- if (instanceIdInt < 0 || instanceIdInt >=
functionMetaData.getFunctionDetails().getParallelism()) {
- log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}",
componentType, tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s
doesn't have instance with id %s", componentType, componentName, instanceId));
- }
-
- FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
-
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
- try {
- functionInstanceStatusData =
functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace,
componentName,
- Integer.parseInt(instanceId), uri);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- return functionInstanceStatusData;
- }
-
- public FunctionStatus getFunctionStatus(final String tenant, final String
namespace, final String componentName,
- URI uri)
- throws IOException {
-
- if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
- }
-
- // validate parameters
- try {
- validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
- } catch (IllegalArgumentException e) {
- log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
-
- FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
- if (!calculateSubjectType(functionMetaData).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
- }
-
- FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
- FunctionStatus functionStatus;
- try {
- functionStatus = functionRuntimeManager.getFunctionStatus(tenant,
namespace, componentName, uri);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- return functionStatus;
- }
-
public FunctionStats getFunctionStats(final String tenant, final String
namespace, final String componentName,
URI uri) throws IOException {
if (!isWorkerServiceAvailable()) {
@@ -1176,7 +1213,7 @@ private void validateListFunctionRequestParams(String
tenant, String namespace)
}
}
- private void validateGetFunctionInstanceRequestParams(String tenant,
String namespace, String componentName,
+ protected void validateGetFunctionInstanceRequestParams(String tenant,
String namespace, String componentName,
ComponentType
componentType, String instanceId) throws IllegalArgumentException {
validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
if (instanceId == null) {
@@ -1184,7 +1221,7 @@ private void
validateGetFunctionInstanceRequestParams(String tenant, String name
}
}
- private void validateGetFunctionRequestParams(String tenant, String
namespace, String subject, ComponentType componentType)
+ protected void validateGetFunctionRequestParams(String tenant, String
namespace, String subject, ComponentType componentType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -1591,4 +1628,42 @@ public ComponentType
calculateSubjectType(FunctionMetaData functionMetaData) {
return SINK;
}
+ protected void componentStatusRequestValidate (final String tenant, final
String namespace, final String componentName) {
+ if (!isWorkerServiceAvailable()) {
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function
worker service is not done initializing. Please try again in a little while.");
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid get {} Status request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
+ throw new RestException(Status.BAD_REQUEST, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
+ log.error("{} in get {} Status does not exist @ /{}/{}/{}",
componentType, componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+ }
+
+ protected void componentInstanceStatusRequestValidate (final String
tenant, final String namespace,
+ final String
componentName, int instanceId) {
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ int parallelism =
functionMetaData.getFunctionDetails().getParallelism();
+ if (instanceId < 0 || instanceId >= parallelism) {
+ log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}",
componentType, tenant, namespace, componentName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't have instance with id %s", componentType,
componentName, instanceId));
+ }
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index eb0e696965..355cf11379 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,14 +19,224 @@
package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
@Slf4j
-public class FunctionsImpl extends FunctionsImplBase {
+public class FunctionsImpl extends ComponentImpl {
+
+ private class GetFunctionStatus extends GetStatus<FunctionStatus,
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
notScheduledInstance() {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ functionInstanceStatusData.setError("Function has not been
scheduled");
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(status.getRunning());
+ functionInstanceStatusData.setError(status.getFailureException());
+ functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ functionInstanceStatusData.setNumReceived(status.getNumReceived());
+
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
+
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
+
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
+
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
+
+
functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
+
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ if (error != null) {
+ functionInstanceStatusData.setError(error);
+ }
+ functionInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return functionInstanceStatusData;
+ }
+
+ @Override
+ public FunctionStatus getStatus(String tenant, String namespace,
String name, Collection<Function.Assignment>
+ assignments, URI uri) throws PulsarAdminException {
+ FunctionStatus functionStatus = new FunctionStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
+ if (isOwner) {
+ functionInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name, assignment
+ .getInstance().getInstanceId(), null);
+ } else {
+ functionInstanceStatusData =
worker().getFunctionAdmin().functions().getFunctionStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ FunctionStatus.FunctionInstanceStatus instanceStatus = new
FunctionStatus.FunctionInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(functionInstanceStatusData);
+ functionStatus.addInstance(instanceStatus);
+ }
+
+ functionStatus.setNumInstances(functionStatus.instances.size());
+ functionStatus.getInstances().forEach(functionInstanceStatus -> {
+ if (functionInstanceStatus.getStatus().isRunning()) {
+ functionStatus.numRunning++;
+ }
+ });
+ return functionStatus;
+ }
+
+ @Override
+ public FunctionStatus getStatusExternal(String tenant, String
namespace, String name, int parallelism) {
+ FunctionStatus functionStatus = new FunctionStatus();
+ for (int i = 0; i < parallelism; ++i) {
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ FunctionStatus.FunctionInstanceStatus functionInstanceStatus
+ = new FunctionStatus.FunctionInstanceStatus();
+ functionInstanceStatus.setInstanceId(i);
+ functionInstanceStatus.setStatus(functionInstanceStatusData);
+ functionStatus.addInstance(functionInstanceStatus);
+ }
+
+ functionStatus.setNumInstances(functionStatus.instances.size());
+ functionStatus.getInstances().forEach(functionInstanceStatus -> {
+ if (functionInstanceStatus.getStatus().isRunning()) {
+ functionStatus.numRunning++;
+ }
+ });
+ return functionStatus;
+ }
+
+ @Override
+ public FunctionStatus emptyStatus(int parallelism) {
+ FunctionStatus functionStatus = new FunctionStatus();
+ functionStatus.setNumInstances(parallelism);
+ functionStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ FunctionStatus.FunctionInstanceStatus functionInstanceStatus =
new FunctionStatus.FunctionInstanceStatus();
+ functionInstanceStatus.setInstanceId(i);
+
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
+ = new
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
+ functionInstanceStatusData.setRunning(false);
+ functionInstanceStatusData.setError("Function has not been
scheduled");
+ functionInstanceStatus.setStatus(functionInstanceStatusData);
+
+ functionStatus.addInstance(functionInstanceStatus);
+ }
+
+ return functionStatus;
+ }
+ }
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, ComponentType.FUNCTION);
}
+
+ /**
+ * Get status of a function instance. If this worker is not running the
function instance,
+ * @param tenant the tenant the function belongs to
+ * @param namespace the namespace the function belongs to
+ * @param componentName the function name
+ * @param instanceId the function instance id
+ * @return the function status
+ */
+ public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
getFunctionInstanceStatus(final String tenant, final String namespace, final
String componentName,
+
final String instanceId, URI uri) throws IOException {
+
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace,
componentName, Integer.parseInt(instanceId));
+
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData;
+ try {
+ functionInstanceStatusData = new
GetFunctionStatus().getComponentInstanceStatus(tenant, namespace, componentName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return functionInstanceStatusData;
+ }
+
+ /**
+ * Get statuses of all function instances.
+ * @param tenant the tenant the function belongs to
+ * @param namespace the namespace the function belongs to
+ * @param componentName the function name
+ * @return a list of function statuses
+ * @throws PulsarAdminException
+ */
+ public FunctionStatus getFunctionStatus(final String tenant, final String
namespace, final String componentName,
+ URI uri) {
+
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ FunctionStatus functionStatus;
+ try {
+ functionStatus = new
GetFunctionStatus().getComponentStatus(tenant, namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return functionStatus;
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index b7202f515d..4ecba12964 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -18,63 +18,205 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
-public class SinkImpl extends FunctionsImplBase {
- public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SINK);
- }
+@Slf4j
+public class SinkImpl extends ComponentImpl {
+ private class GetSinkStatus extends GetStatus<SinkStatus,
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
- public SinkStatus getSinkStatus(final String tenant, final String
namespace,
- final String componentName, URI uri)
throws IOException {
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
notScheduledInstance() {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ sinkInstanceStatusData.setError("Sink has not been scheduled");
+ return sinkInstanceStatusData;
+ }
- FunctionStatus functionStatus = getFunctionStatus(tenant, namespace,
componentName, uri);
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(status.getRunning());
+ sinkInstanceStatusData.setError(status.getFailureException());
+ sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ sinkInstanceStatusData.setNumReceived(status.getNumReceived());
- SinkStatus sinkStatus = new SinkStatus();
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
- sinkStatus.setNumInstances(functionStatus.getNumInstances());
- sinkStatus.setNumRunning(functionStatus.getNumRunning());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- SinkStatus.SinkInstanceStatus sinkInstanceStatus = new
SinkStatus.SinkInstanceStatus();
-
sinkInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-
sinkInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
- sinkStatus.addInstance(sinkInstanceStatus);
- });
- return sinkStatus;
+
sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sinkInstanceStatusData;
+ }
+
+ @Override
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ if (error != null) {
+ sinkInstanceStatusData.setError(error);
+ }
+ sinkInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sinkInstanceStatusData;
+ }
+
+ @Override
+ public SinkStatus getStatus(String tenant, String namespace, String
name, Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException {
+ SinkStatus sinkStatus = new SinkStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData;
+ if (isOwner) {
+ sinkInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name,
assignment.getInstance().getInstanceId(), null);
+ } else {
+ sinkInstanceStatusData =
worker().getFunctionAdmin().sink().getSinkStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ SinkStatus.SinkInstanceStatus instanceStatus = new
SinkStatus.SinkInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(sinkInstanceStatusData);
+ sinkStatus.addInstance(instanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+ if (sinkInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SinkStatus getStatusExternal (String tenant, String namespace,
String name, int parallelism) {
+ SinkStatus sinkStatus = new SinkStatus();
+ for (int i = 0; i < parallelism; ++i) {
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ SinkStatus.SinkInstanceStatus sinkInstanceStatus
+ = new SinkStatus.SinkInstanceStatus();
+ sinkInstanceStatus.setInstanceId(i);
+ sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+ sinkStatus.addInstance(sinkInstanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
+ if (sinkInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SinkStatus emptyStatus(int parallelism) {
+ SinkStatus sinkStatus = new SinkStatus();
+ sinkStatus.setNumInstances(parallelism);
+ sinkStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ SinkStatus.SinkInstanceStatus sinkInstanceStatus = new
SinkStatus.SinkInstanceStatus();
+ sinkInstanceStatus.setInstanceId(i);
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
+ = new
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
+ sinkInstanceStatusData.setRunning(false);
+ sinkInstanceStatusData.setError("Sink has not been scheduled");
+ sinkInstanceStatus.setStatus(sinkInstanceStatusData);
+
+ sinkStatus.addInstance(sinkInstanceStatus);
+ }
+
+ return sinkStatus;
+ }
}
- public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(String tenant,
-
String namespace,
-
String sinkName,
-
String instanceId,
-
URI requestUri)
+ public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier, ComponentType.SINK);
+ }
+
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(
+ String tenant, String namespace, String sinkName, String
instanceId, URI uri)
throws IOException {
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace, sinkName,
instanceId, requestUri);
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace, sinkName,
Integer.parseInt(instanceId));
- return fromFunctionInstanceStatus(functionInstanceStatusData);
- }
- private static SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
fromFunctionInstanceStatus(
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData) {
- SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData
- = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
-
- sinkInstanceStatusData.setError(functionInstanceStatusData.getError());
-
sinkInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-
sinkInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-
sinkInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-
sinkInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-
sinkInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-
sinkInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
sinkInstanceStatusData;
+ try {
+ sinkInstanceStatusData = new
GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
return sinkInstanceStatusData;
}
+
+ public SinkStatus getSinkStatus(
+ final String tenant, final String namespace,
+ final String componentName, URI uri) {
+
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
+
+ SinkStatus sinkStatus;
+ try {
+ sinkStatus = new GetSinkStatus().getComponentStatus(tenant,
namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ return sinkStatus;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f69b03ea65..412019a184 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -18,62 +18,199 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.function.Supplier;
-public class SourceImpl extends FunctionsImplBase {
+@Slf4j
+public class SourceImpl extends ComponentImpl {
+ private class GetSourceStatus extends GetStatus<SourceStatus,
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
notScheduledInstance() {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ sourceInstanceStatusData.setError("Source has not been scheduled");
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
fromFunctionStatusProto(
+ InstanceCommunication.FunctionStatus status,
+ String assignedWorkerId) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(status.getRunning());
+ sourceInstanceStatusData.setError(status.getFailureException());
+ sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
+ sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+
+ List<ExceptionInformation> userExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ userExceptionInformationList.add(exceptionInformation);
+ }
+
+
sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new
LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation
exceptionEntry : status.getLatestSystemExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+
+
sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+ sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
notRunning(String assignedWorkerId, String error) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ if (error != null) {
+ sourceInstanceStatusData.setError(error);
+ }
+ sourceInstanceStatusData.setWorkerId(assignedWorkerId);
+
+ return sourceInstanceStatusData;
+ }
+
+ @Override
+ public SourceStatus getStatus(String tenant, String namespace, String
name, Collection<Function.Assignment> assignments, URI uri) throws
PulsarAdminException {
+ SourceStatus sourceStatus = new SourceStatus();
+ for (Function.Assignment assignment : assignments) {
+ boolean isOwner =
worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData;
+ if (isOwner) {
+ sourceInstanceStatusData =
getComponentInstanceStatus(tenant, namespace, name,
assignment.getInstance().getInstanceId(), null);
+ } else {
+ sourceInstanceStatusData =
worker().getFunctionAdmin().source().getSourceStatus(
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
+ }
+
+ SourceStatus.SourceInstanceStatus instanceStatus = new
SourceStatus.SourceInstanceStatus();
+
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
+ instanceStatus.setStatus(sourceInstanceStatusData);
+ sourceStatus.addInstance(instanceStatus);
+ }
+
+ sourceStatus.setNumInstances(sourceStatus.instances.size());
+ sourceStatus.getInstances().forEach(sourceInstanceStatus -> {
+ if (sourceInstanceStatus.getStatus().isRunning()) {
+ sourceStatus.numRunning++;
+ }
+ });
+ return sourceStatus;
+ }
+
+ @Override
+ public SourceStatus getStatusExternal(String tenant, String namespace,
String name, int parallelism) {
+ SourceStatus sinkStatus = new SourceStatus();
+ for (int i = 0; i < parallelism; ++i) {
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = getComponentInstanceStatus(tenant, namespace, name,
i, null);
+ SourceStatus.SourceInstanceStatus sourceInstanceStatus
+ = new SourceStatus.SourceInstanceStatus();
+ sourceInstanceStatus.setInstanceId(i);
+ sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+ sinkStatus.addInstance(sourceInstanceStatus);
+ }
+
+ sinkStatus.setNumInstances(sinkStatus.instances.size());
+ sinkStatus.getInstances().forEach(sourceInstanceStatus -> {
+ if (sourceInstanceStatus.getStatus().isRunning()) {
+ sinkStatus.numRunning++;
+ }
+ });
+ return sinkStatus;
+ }
+
+ @Override
+ public SourceStatus emptyStatus(int parallelism) {
+ SourceStatus sourceStatus = new SourceStatus();
+ sourceStatus.setNumInstances(parallelism);
+ sourceStatus.setNumRunning(0);
+ for (int i = 0; i < parallelism; i++) {
+ SourceStatus.SourceInstanceStatus sourceInstanceStatus = new
SourceStatus.SourceInstanceStatus();
+ sourceInstanceStatus.setInstanceId(i);
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
+ = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
+ sourceInstanceStatusData.setRunning(false);
+ sourceInstanceStatusData.setError("Source has not been
scheduled");
+ sourceInstanceStatus.setStatus(sourceInstanceStatusData);
+
+ sourceStatus.addInstance(sourceInstanceStatus);
+ }
+
+ return sourceStatus;
+ }
+ }
+
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant, final String
namespace,
final String componentName, URI uri)
throws IOException {
+ // validate parameters
+ componentStatusRequestValidate(tenant, namespace, componentName);
- FunctionStatus functionStatus = getFunctionStatus(tenant, namespace,
componentName, uri);
-
- SourceStatus sourceStatus = new SourceStatus();
+ SourceStatus sourceStatus;
+ try {
+ sourceStatus = new GetSourceStatus().getComponentStatus(tenant,
namespace, componentName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, componentName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
- sourceStatus.setNumInstances(functionStatus.getNumInstances());
- sourceStatus.setNumRunning(functionStatus.getNumRunning());
- functionStatus.getInstances().forEach(functionInstanceStatus -> {
- SourceStatus.SourceInstanceStatus sourceInstanceStatus = new
SourceStatus.SourceInstanceStatus();
-
sourceInstanceStatus.setInstanceId(functionInstanceStatus.getInstanceId());
-
sourceInstanceStatus.setStatus(fromFunctionInstanceStatus(functionInstanceStatus.getStatus()));
- sourceStatus.addInstance(sourceInstanceStatus);
- });
return sourceStatus;
}
- public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
getSourceInstanceStatus(String tenant,
-
String namespace,
-
String sourceName,
-
String instanceId,
-
URI requestUri)
- throws IOException {
-
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData
- = getFunctionInstanceStatus(tenant, namespace, sourceName,
instanceId, requestUri);
-
- return fromFunctionInstanceStatus(functionInstanceStatusData);
- }
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
getSourceInstanceStatus(
+ String tenant, String namespace, String sourceName, String
instanceId, URI uri) {
+ // validate parameters
+ componentInstanceStatusRequestValidate(tenant, namespace, sourceName,
Integer.parseInt(instanceId));
- private static SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
fromFunctionInstanceStatus(
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatusData) {
- SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData
- = new
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
-
-
sourceInstanceStatusData.setError(functionInstanceStatusData.getError());
-
sourceInstanceStatusData.setLastInvocationTime(functionInstanceStatusData.getLastInvocationTime());
-
sourceInstanceStatusData.setLatestSystemExceptions(functionInstanceStatusData.getLatestSystemExceptions());
-
sourceInstanceStatusData.setNumReceived(functionInstanceStatusData.getNumReceived());
-
sourceInstanceStatusData.setNumRestarts(functionInstanceStatusData.getNumRestarts());
-
sourceInstanceStatusData.setRunning(functionInstanceStatusData.isRunning());
-
sourceInstanceStatusData.setWorkerId(functionInstanceStatusData.getWorkerId());
+ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
sourceInstanceStatusData;
+ try {
+ sourceInstanceStatusData = new
GetSourceStatus().getComponentInstanceStatus(tenant, namespace, sourceName,
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("{}/{}/{} Got Exception Getting Status", tenant,
namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
return sourceInstanceStatusData;
}
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index bf3766cc8a..98747edd32 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -70,7 +70,7 @@
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -169,7 +169,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
}
//
@@ -1220,9 +1220,9 @@ public void testOnlyGetSources() throws Exception {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultFunctions();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 234ca2f881..a0e6843769 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -43,8 +43,7 @@
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
import org.apache.pulsar.io.cassandra.CassandraStringSink;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -174,7 +173,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-
Mockito.doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
}
//
@@ -1183,9 +1182,9 @@ public void testOnlyGetSinks() throws Exception {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultSinks();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 8f72123804..fcba85c61a 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -41,8 +41,7 @@
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImplBase;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
import org.apache.pulsar.io.twitter.TwitterFireHose;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -159,7 +158,7 @@ public void setup() throws Exception {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-
Mockito.doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+
Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
}
//
@@ -1186,9 +1185,9 @@ public void testOnlyGetSources() throws Exception {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant),
eq(namespace))).thenReturn(functionMetaDataList);
-
doReturn(FunctionsImplBase.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-
doReturn(FunctionsImplBase.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-
doReturn(FunctionsImplBase.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+
doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+
doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
Response response = listDefaultSources();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7ede4cdd9..5add9a1455 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -255,6 +255,7 @@ protected void getSinkStatus(String tenant, String
namespace, String sinkName) t
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
try {
@@ -481,6 +482,8 @@ protected void getSourceStatus(String tenant, String
namespace, String sourceNam
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SourceStatus sourceStatus =
SourceStatus.decode(result.getStdout());
try {
assertEquals(sourceStatus.getNumInstances(), 1);
@@ -532,6 +535,8 @@ protected void waitForProcessingSourceMessages(String
tenant,
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SourceStatus sourceStatus =
SourceStatus.decode(result.getStdout());
try {
assertEquals(sourceStatus.getNumInstances(), 1);
@@ -574,6 +579,8 @@ protected void waitForProcessingSinkMessages(String tenant,
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
+ assertEquals(result.getExitCode(), 0);
+
SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
try {
assertEquals(sinkStatus.getNumInstances(), 1);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services