This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bcd893 Add support to restart function (#2365)
7bcd893 is described below
commit 7bcd8934a0a53ab7a62b3c9d77fbdec94ab497d2
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Aug 13 22:49:11 2018 -0700
Add support to restart function (#2365)
* Add support to restart function
fix: pulsar function restart
* add support to restart all function instances
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 25 +++++
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 63 +++++++++++-
.../org/apache/pulsar/client/admin/Functions.java | 33 +++++++
.../client/admin/internal/FunctionsImpl.java | 22 +++++
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 29 ++++++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 37 ++++++-
.../pulsar/functions/worker/FunctionActioner.java | 4 +-
.../functions/worker/FunctionRuntimeManager.java | 107 +++++++++++++++++++++
.../functions/worker/rest/api/FunctionsImpl.java | 66 +++++++++++++
.../worker/rest/api/v2/FunctionApiV2Resource.java | 28 ++++++
11 files changed, 404 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 564eb18..b8891e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -268,6 +268,31 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
}
@POST
+ @ApiOperation(value = "Restart function instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all function instances", response =
Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName) {
+ return functions.restartFunctionInstances(tenant, namespace,
functionName);
+ }
+
+ @POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
hidden = true
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index c57a8a0..2254626 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -206,7 +206,7 @@ public class PulsarFunctionTlsTest {
String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
FunctionDetails functionDetails =
PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
- functionName, sinkTopic, subscriptionName);
+ functionName, "my.*", sinkTopic, subscriptionName);
try {
functionAdmin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5db9a0a..1306f13 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -271,7 +271,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
- sinkTopic, subscriptionName);
+ "my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
// try to update function to test: update-function functionality
@@ -333,7 +333,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
- sinkTopic, subscriptionName);
+ "my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
// try to update function to test: update-function functionality
@@ -382,7 +382,7 @@ public class PulsarSinkE2ETest {
assertEquals(ownerWorkerId, workerId);
}
- protected static FunctionDetails createSinkConfig(String jarFile, String
tenant, String namespace, String functionName, String sinkTopic, String
subscriptionName) {
+ protected static FunctionDetails createSinkConfig(String jarFile, String
tenant, String namespace, String functionName, String sourceTopic, String
sinkTopic, String subscriptionName) {
File file = new File(jarFile);
try {
@@ -390,7 +390,7 @@ public class PulsarSinkE2ETest {
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
- String sourceTopicPattern = String.format("persistent://%s/%s/my.*",
tenant, namespace);
+ String sourceTopicPattern = String.format("persistent://%s/%s/%s",
tenant, namespace, sourceTopic);
Class<?> typeArg = byte[].class;
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
@@ -446,7 +446,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
- sinkTopic, subscriptionName);
+ "my.*", sinkTopic, subscriptionName);
try {
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
assertTrue(validRoleName);
@@ -507,4 +507,57 @@ public class PulsarSinkE2ETest {
assertEquals(functionMetadata.getSink().getTypeClassName(),
typeArgs[1].getName());
}
+
+ @Test(timeOut = 20000)
+ public void testFunctionRestartApi() throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopicName = "restartFunction";
+ final String sourceTopic = "persistent://" + replNamespace + "/" +
sourceTopicName;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSink-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create source topic
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(sourceTopic).create();
+
+ String jarFilePathUrl = Utils.FILE + ":"
+ +
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
+ sourceTopicName, sinkTopic, subscriptionName);
+ admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats != null && subStats.consumers.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ assertEquals(subStats.consumers.size(), 1);
+
+ // it should restart consumer : so, check if consumer came up again
after restarting function
+ admin.functions().restartFunction(tenant, namespacePortion,
functionName);
+
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStat =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStat != null && subStat.consumers.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ assertEquals(subStats.consumers.size(), 1);
+
+ producer.close();
+ }
}
\ No newline at end of file
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index c04873d..4525c51 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -181,6 +181,39 @@ public interface Functions {
* Unexpected error
*/
FunctionStatusList getFunctionStatus(String tenant, String namespace,
String function) throws PulsarAdminException;
+
+ /**
+ * Restart function instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @param instanceId
+ * Function instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartFunction(String tenant, String namespace, String function, int
instanceId) throws PulsarAdminException;
+
+ /**
+ * Restart all function instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartFunction(String tenant, String namespace, String function)
throws PulsarAdminException;
/**
* Triggers the function by writing to the input topic.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 028da3c..402b5d3 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -216,6 +216,27 @@ public class FunctionsImpl extends BaseResource implements
Functions {
}
@Override
+ public void restartFunction(String tenant, String namespace, String
functionName, int instanceId)
+ throws PulsarAdminException {
+ try {
+
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+ .path("restart")).post(Entity.entity("",
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void restartFunction(String tenant, String namespace, String
functionName) throws PulsarAdminException {
+ try {
+
request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public void uploadFunction(String sourceFile, String path) throws
PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
@@ -289,4 +310,5 @@ public class FunctionsImpl extends BaseResource implements
Functions {
public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
}
+
}
\ No newline at end of file
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e206e75..11a3c7a 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -31,6 +31,7 @@ import
org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
+import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
@@ -215,6 +216,34 @@ public class CmdFunctionsTest {
}
@Test
+ public void restartFunction() throws Exception {
+ String fnName = TEST_NAME + "-function";
+ String tenant = "sample";
+ String namespace = "ns1";
+ int instanceId = 0;
+ cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace",
namespace, "--name", fnName,
+ "--instance-id", Integer.toString(instanceId)});
+
+ RestartFunction restarter = cmd.getRestarter();
+ assertEquals(fnName, restarter.getFunctionName());
+
+ verify(functions, times(1)).restartFunction(tenant, namespace, fnName,
instanceId);
+ }
+
+ @Test
+ public void restartFunctionInstances() throws Exception {
+ String fnName = TEST_NAME + "-function";
+ String tenant = "sample";
+ String namespace = "ns1";
+ cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace",
namespace, "--name", fnName });
+
+ RestartFunction restarter = cmd.getRestarter();
+ assertEquals(fnName, restarter.getFunctionName());
+
+ verify(functions, times(1)).restartFunction(tenant, namespace, fnName);
+ }
+
+ @Test
public void testCreateFunctionWithHttpUrl() throws Exception {
String fnName = TEST_NAME + "-function";
String inputTopicName = TEST_NAME + "-input-topic";
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b11dabe..dd1ef3d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -102,7 +102,8 @@ public class CmdFunctions extends CmdBase {
private final DeleteFunction deleter;
private final UpdateFunction updater;
private final GetFunction getter;
- private final GetFunctionStatus statuser;
+ private final GetFunctionStatus functionStatus;
+ private final RestartFunction restart;
private final ListFunctions lister;
private final StateGetter stateGetter;
private final TriggerFunction triggerer;
@@ -164,7 +165,7 @@ public class CmdFunctions extends CmdBase {
@Parameter(names = "--name", description = "The function's name")
protected String functionName;
-
+
@Override
void processArguments() throws Exception {
super.processArguments();
@@ -831,6 +832,27 @@ public class CmdFunctions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Restart function instance")
+ class RestartFunction extends FunctionCommand {
+
+ @Parameter(names = "--instance-id", description = "The function
instanceId (restart all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.functions().restartFunction(tenant, namespace,
functionName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.functions().restartFunction(tenant, namespace,
functionName);
+ }
+ System.out.println("Restarted successfully");
+ }
+ }
+
@Parameters(commandDescription = "Delete a Pulsar Function that's running
on a Pulsar cluster")
class DeleteFunction extends FunctionCommand {
@Override
@@ -1035,18 +1057,20 @@ public class CmdFunctions extends CmdBase {
deleter = new DeleteFunction();
updater = new UpdateFunction();
getter = new GetFunction();
- statuser = new GetFunctionStatus();
+ functionStatus = new GetFunctionStatus();
lister = new ListFunctions();
stateGetter = new StateGetter();
triggerer = new TriggerFunction();
uploader = new UploadFunction();
downloader = new DownloadFunction();
cluster = new GetCluster();
+ restart = new RestartFunction();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
jcommander.addCommand("update", getUpdater());
jcommander.addCommand("get", getGetter());
+ jcommander.addCommand("restart", getRestarter());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("list", getLister());
jcommander.addCommand("querystate", getStateGetter());
@@ -1082,7 +1106,7 @@ public class CmdFunctions extends CmdBase {
}
@VisibleForTesting
- GetFunctionStatus getStatuser() { return statuser; }
+ GetFunctionStatus getStatuser() { return functionStatus; }
@VisibleForTesting
ListFunctions getLister() {
@@ -1109,6 +1133,11 @@ public class CmdFunctions extends CmdBase {
return downloader;
}
+ @VisibleForTesting
+ RestartFunction getRestarter() {
+ return restart;
+ }
+
private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig
functionConfig) {
String[] args = fqfn.split("/");
if (args.length != 3) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 0927360..b3f30fd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -127,7 +127,7 @@ public class FunctionActioner implements AutoCloseable {
}
@VisibleForTesting
- protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo)
throws Exception {
+ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws
Exception {
FunctionMetaData functionMetaData =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
@@ -225,7 +225,7 @@ public class FunctionActioner implements AutoCloseable {
}
}
- private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+ public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
log.info("Stopping function {} - {}...",
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5e1995e..121a454 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -35,9 +39,14 @@ import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -317,6 +326,104 @@ public class FunctionRuntimeManager implements
AutoCloseable{
return functionStatus;
}
+ public Response restartFunctionInstance(String tenant, String namespace,
String functionName, int instanceId) throws Exception {
+ Assignment assignment = this.findAssignment(tenant, namespace,
functionName, instanceId);
+ final String fullFunctionName = String.format("%s/%s/%s/%s", tenant,
namespace, functionName, instanceId);
+ if (assignment == null) {
+ return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(fullFunctionName + " doesn't
exist")).build();
+ }
+
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = this.workerConfig.getWorkerId();
+
+ if (assignedWorkerId.equals(workerId)) {
+
restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+ return Response.status(Status.OK).build();
+ } else {
+ // query other worker
+ List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry : workerInfoList) {
+ if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build();
+ }
+
+ URI redirect = null;
+ final String redirectUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart",
+ workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName, instanceId);
+ try {
+ redirect = new URI(redirectUrl);
+ } catch (URISyntaxException e) {
+ log.error("Error in preparing redirect url for {}/{}/{}/{}:
{}", tenant, namespace, functionName,
+ instanceId, e.getMessage(), e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(fullFunctionName + " invalid
redirection url")).build();
+ }
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+
+ public Response restartFunctionInstances(String tenant, String namespace,
String functionName) throws Exception {
+ final String fullFunctionName = String.format("%s/%s/%s", tenant,
namespace, functionName);
+ Collection<Assignment> assignments =
this.findFunctionAssignments(tenant, namespace, functionName);
+
+ if (assignments.isEmpty()) {
+ return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(fullFunctionName + " has not been
assigned yet")).build();
+ }
+ for (Assignment assignment : assignments) {
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = this.workerConfig.getWorkerId();
+ String fullyQualifiedInstanceId =
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ if (assignedWorkerId.equals(workerId)) {
+ restartFunction(fullyQualifiedInstanceId);
+ } else {
+ List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry : workerInfoList) {
+ if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] has not been assigned yet",
fullyQualifiedInstanceId);
+ }
+ continue;
+ }
+ Client client = ClientBuilder.newClient();
+ // TODO: create and use pulsar-admin to support authorization
and authentication and manage redirect
+ final String instanceRestartUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart",
+ workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName,
+ assignment.getInstance().getInstanceId());
+
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ }
+ }
+ return Response.status(Status.OK).build();
+ }
+
+ private void restartFunction(String fullyQualifiedInstanceId) throws
Exception {
+ log.info("[{}] restarting..", fullyQualifiedInstanceId);
+ FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
+ if (functionRuntimeInfo != null) {
+ this.functionActioner.stopFunction(functionRuntimeInfo);
+ try {
+ this.functionActioner.startFunction(functionRuntimeInfo);
+ } catch (Exception ex) {
+ log.info("{} Error starting function",
fullyQualifiedInstanceId, ex);
+ functionRuntimeInfo.setStartupException(ex);
+ throw ex;
+ }
+ }
+ }
+
/**
* Get statuses of all function instances.
* @param tenant the tenant the function belongs to
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4bb6e49..a9e03de 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -386,6 +386,72 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(jsonResponse).build();
}
+ public Response restartFunctionInstance(final String tenant, final String
namespace, final String functionName,
+ final String instanceId) {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionInstanceRequestParams(tenant, namespace,
functionName, instanceId);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid restart-function request @ /{}/{}/{}", tenant,
namespace, functionName, e);
+ return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
+ log.error("Function in getFunctionStatus does not exist @
/{}/{}/{}", tenant, namespace, functionName);
+ return
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format("Function %s doesn't
exist", functionName))).build();
+ }
+
+ FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
+ try {
+ return functionRuntimeManager.restartFunctionInstance(tenant,
namespace, functionName,
+ Integer.parseInt(instanceId));
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("Failed to restart function: {}/{}/{}/{}", tenant,
namespace, functionName, instanceId, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(),
e.getMessage()).build();
+ }
+ }
+
+ public Response restartFunctionInstances(final String tenant, final String
namespace, final String functionName) {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionRequestParams(tenant, namespace, functionName);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid restart-Function request @ /{}/{}/{}", tenant,
namespace, functionName, e);
+ return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
+ log.error("Function in getFunctionStatus does not exist @
/{}/{}/{}", tenant, namespace, functionName);
+ return
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format("Function %s doesn't
exist", functionName))).build();
+ }
+
+ FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
+ try {
+ return functionRuntimeManager.restartFunctionInstances(tenant,
namespace, functionName);
+ }catch (Exception e) {
+ log.error("Failed to restart function: {}/{}/{}", tenant,
namespace, functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(),
e.getMessage()).build();
+ }
+ }
+
public Response getFunctionStatus(final String tenant, final String
namespace, final String functionName)
throws IOException {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 96baada..3581453 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.io.ConnectorDefinition;
import java.io.IOException;
import java.io.InputStream;
@@ -45,6 +46,8 @@ import
org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -163,6 +166,31 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
}
@POST
+ @ApiOperation(value = "Restart function instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all function instances", response =
Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName) {
+ return functions.restartFunctionInstances(tenant, namespace,
functionName);
+ }
+
+ @POST
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @FormDataParam("data") InputStream
uploadedInputStream,