This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 020a1d5 Fixed the behavior of Function start/stop (#3477)
020a1d5 is described below
commit 020a1d57e122582a5ad8bd043f278e4a92d4ffc1
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Jan 31 16:47:39 2019 -0800
Fixed the behavior of Function start/stop (#3477)
* Added a state in the function metadata about what the state of the
instances should be
* Have start api for sources/sinks
* Add missing pieces
* more checks while handling request
* Fixed bugs
* Added unittests
* Added unittest
* Fix the all instances side logic
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 31 +++++
.../apache/pulsar/broker/admin/impl/SinkBase.java | 31 +++++
.../pulsar/broker/admin/impl/SourceBase.java | 29 +++++
.../org/apache/pulsar/client/admin/Functions.java | 34 +++++
.../java/org/apache/pulsar/client/admin/Sink.java | 34 +++++
.../org/apache/pulsar/client/admin/Source.java | 34 +++++
.../client/admin/internal/FunctionsImpl.java | 21 ++++
.../pulsar/client/admin/internal/SinkImpl.java | 21 ++++
.../pulsar/client/admin/internal/SourceImpl.java | 21 ++++
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 29 +++++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 33 ++++-
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 ++++-
.../org/apache/pulsar/admin/cli/CmdSources.java | 28 ++++-
.../proto/src/main/proto/Function.proto | 5 +
.../functions/worker/FunctionMetaDataManager.java | 59 +++++++++
.../functions/worker/FunctionRuntimeManager.java | 56 ++++++---
.../functions/worker/rest/api/ComponentImpl.java | 139 +++++++++++++++++----
.../worker/rest/api/v3/FunctionApiV3Resource.java | 31 +++++
.../worker/rest/api/v3/SinkApiV3Resource.java | 27 ++++
.../worker/rest/api/v3/SourceApiV3Resource.java | 27 ++++
.../worker/FunctionMetaDataManagerTest.java | 62 +++++++++
.../worker/FunctionRuntimeManagerTest.java | 53 ++++++++
22 files changed, 781 insertions(+), 52 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 73af2c5..9b88f29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -338,6 +338,37 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
}
@POST
+ @ApiOperation(value = "Start function instance", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String
functionName,
+ final @PathParam("instanceId") String
instanceId) {
+ functions.startFunctionInstance(tenant, namespace, functionName,
instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all function instances", response =
Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String
functionName) {
+ functions.startFunctionInstances(tenant, namespace, functionName);
+ }
+
+ @POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
hidden = true
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 41c5376..2bd22a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -256,6 +256,37 @@ public class SinkBase extends AdminResource implements
Supplier<WorkerService> {
sink.stopFunctionInstances(tenant, namespace, sinkName);
}
+ @POST
+ @ApiOperation(value = "Start sink instance", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ sink.startFunctionInstance(tenant, namespace, sinkName, instanceId,
uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all sink instances", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) {
+ sink.startFunctionInstances(tenant, namespace, sinkName);
+ }
+
@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO sink connectors
currently running in cluster mode",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index c4a102b..0e8348f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -251,6 +251,35 @@ public class SourceBase extends AdminResource implements
Supplier<WorkerService>
source.stopFunctionInstances(tenant, namespace, sourceName);
}
+ @POST
+ @ApiOperation(value = "Start source instance", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ source.startFunctionInstance(tenant, namespace, sourceName,
instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all source instances", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) {
+ source.startFunctionInstances(tenant, namespace, sourceName);
+ }
+
@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO source connectors
currently running in cluster mode",
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 2bd2e9f..481c5fd 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -285,6 +285,39 @@ public interface Functions {
void stopFunction(String tenant, String namespace, String function, int
instanceId) throws PulsarAdminException;
/**
+ * Start all function instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startFunction(String tenant, String namespace, String function)
throws PulsarAdminException;
+
+ /**
+ * Start function instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @param instanceId
+ * Function instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startFunction(String tenant, String namespace, String function, int
instanceId) throws PulsarAdminException;
+
+ /**
* Stop all function instances
*
* @param tenant
@@ -299,6 +332,7 @@ public interface Functions {
*/
void stopFunction(String tenant, String namespace, String function) throws
PulsarAdminException;
+
/**
* Triggers the function by writing to the input topic.
*
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 9f2d9ab..2b924f6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -264,6 +264,40 @@ public interface Sink {
void stopSink(String tenant, String namespace, String sink) throws
PulsarAdminException;
/**
+ * Start sink instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @param instanceId
+ * Sink instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startSink(String tenant, String namespace, String sink, int
instanceId) throws PulsarAdminException;
+
+ /**
+ * Start all sink instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startSink(String tenant, String namespace, String sink) throws
PulsarAdminException;
+
+
+ /**
* Fetches a list of supported Pulsar IO sinks currently running in
cluster mode
*
* @throws PulsarAdminException
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 989598a..706150b 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -264,6 +264,40 @@ public interface Source {
void stopSource(String tenant, String namespace, String source) throws
PulsarAdminException;
/**
+ * Start source instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @param instanceId
+ * Source instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startSource(String tenant, String namespace, String source, int
instanceId) throws PulsarAdminException;
+
+ /**
+ * Start all source instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void startSource(String tenant, String namespace, String source) throws
PulsarAdminException;
+
+
+ /**
* Fetches a list of supported Pulsar IO sources currently running in
cluster mode
*
* @throws PulsarAdminException
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 130704e..ac8d60d 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -291,6 +291,27 @@ public class FunctionsImpl extends BaseResource implements
Functions {
}
@Override
+ public void startFunction(String tenant, String namespace, String
functionName, int instanceId)
+ throws PulsarAdminException {
+ try {
+
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+ .path("start")).post(Entity.entity("",
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void startFunction(String tenant, String namespace, String
functionName) throws PulsarAdminException {
+ try {
+
request(functions.path(tenant).path(namespace).path(functionName).path("start"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public void uploadFunction(String sourceFile, String path) throws
PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index a9f99b8..48a75e4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -233,6 +233,27 @@ public class SinkImpl extends BaseResource implements Sink
{
}
@Override
+ public void startSink(String tenant, String namespace, String sinkName,
int instanceId)
+ throws PulsarAdminException {
+ try {
+
request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
+ .path("start")).post(Entity.entity("",
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void startSink(String tenant, String namespace, String sinkName)
throws PulsarAdminException {
+ try {
+
request(sink.path(tenant).path(namespace).path(sinkName).path("start"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public List<ConnectorDefinition> getBuiltInSinks() throws
PulsarAdminException {
try {
Response response = request(sink.path("builtinsinks")).get();
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 2d066e0..1a56dc4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -233,6 +233,27 @@ public class SourceImpl extends BaseResource implements
Source {
}
@Override
+ public void startSource(String tenant, String namespace, String
sourceName, int instanceId)
+ throws PulsarAdminException {
+ try {
+
request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
+ .path("start")).post(Entity.entity("",
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void startSource(String tenant, String namespace, String
sourceName) throws PulsarAdminException {
+ try {
+
request(source.path(tenant).path(namespace).path(sourceName).path("start"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public List<ConnectorDefinition> getBuiltInSources() throws
PulsarAdminException {
try {
Response response = request(source.path("builtinsources")).get();
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 950ee94..a64c38c 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -254,6 +254,35 @@ public class CmdFunctionsTest {
}
@Test
+ public void startFunction() throws Exception {
+ String fnName = TEST_NAME + "-function";
+ String tenant = "sample";
+ String namespace = "ns1";
+ int instanceId = 0;
+ cmd.run(new String[] { "start", "--tenant", tenant, "--namespace",
namespace, "--name", fnName,
+ "--instance-id", Integer.toString(instanceId)});
+
+ CmdFunctions.StartFunction stop = cmd.getStarter();
+ assertEquals(fnName, stop.getFunctionName());
+
+ verify(functions, times(1)).startFunction(tenant, namespace, fnName,
instanceId);
+ }
+
+ @Test
+ public void startFunctionInstances() throws Exception {
+ String fnName = TEST_NAME + "-function";
+ String tenant = "sample";
+ String namespace = "ns1";
+ cmd.run(new String[] { "start", "--tenant", tenant, "--namespace",
namespace, "--name", fnName });
+
+ CmdFunctions.StartFunction stop = cmd.getStarter();
+ assertEquals(fnName, stop.getFunctionName());
+
+ verify(functions, times(1)).startFunction(tenant, namespace, fnName);
+ }
+
+
+ @Test
public void testGetFunctionStatus() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 00ce1af..b651ab1 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -66,6 +66,7 @@ public class CmdFunctions extends CmdBase {
private final GetFunctionStats functionStats;
private final RestartFunction restart;
private final StopFunction stop;
+ private final StartFunction start;
private final ListFunctions lister;
private final StateGetter stateGetter;
private final TriggerFunction triggerer;
@@ -673,7 +674,7 @@ public class CmdFunctions extends CmdBase {
}
}
- @Parameters(commandDescription = "Temporary stops function instance. (If
worker restarts then it reassigns and starts functiona again")
+ @Parameters(commandDescription = "Stops function instance")
class StopFunction extends FunctionCommand {
@Parameter(names = "--instance-id", description = "The function
instanceId (stop all instances if instance-id is not provided")
@@ -690,7 +691,28 @@ public class CmdFunctions extends CmdBase {
} else {
admin.functions().stopFunction(tenant, namespace,
functionName);
}
- System.out.println("Restarted successfully");
+ System.out.println("Stopped successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Starts a stopped function instance")
+ class StartFunction extends FunctionCommand {
+
+ @Parameter(names = "--instance-id", description = "The function
instanceId (start all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.functions().startFunction(tenant, namespace,
functionName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.functions().startFunction(tenant, namespace,
functionName);
+ }
+ System.out.println("Started successfully");
}
}
@@ -882,6 +904,7 @@ public class CmdFunctions extends CmdBase {
downloader = new DownloadFunction();
restart = new RestartFunction();
stop = new StopFunction();
+ start = new StartFunction();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
@@ -889,6 +912,7 @@ public class CmdFunctions extends CmdBase {
jcommander.addCommand("get", getGetter());
jcommander.addCommand("restart", getRestarter());
jcommander.addCommand("stop", getStopper());
+ jcommander.addCommand("start", getStarter());
// TODO depecreate getstatus
jcommander.addCommand("status", getStatuser(), "getstatus");
jcommander.addCommand("stats", getFunctionStats());
@@ -962,6 +986,11 @@ public class CmdFunctions extends CmdBase {
return stop;
}
+ @VisibleForTesting
+ StartFunction getStarter() {
+ return start;
+ }
+
private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig
functionConfig) {
String[] args = fqfn.split("/");
if (args.length != 3) {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index dd533ac..00cd26f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -64,6 +64,7 @@ public class CmdSinks extends CmdBase {
private final GetSink getSink;
private final GetSinkStatus getSinkStatus;
private final StopSink stopSink;
+ private final StartSink startSink;
private final RestartSink restartSink;
private final LocalSinkRunner localSinkRunner;
@@ -76,6 +77,7 @@ public class CmdSinks extends CmdBase {
getSink = new GetSink();
getSinkStatus = new GetSinkStatus();
stopSink = new StopSink();
+ startSink = new StartSink();
restartSink = new RestartSink();
localSinkRunner = new LocalSinkRunner();
@@ -87,6 +89,7 @@ public class CmdSinks extends CmdBase {
// TODO deprecate getstatus
jcommander.addCommand("status", getSinkStatus, "getstatus");
jcommander.addCommand("stop", stopSink);
+ jcommander.addCommand("start", startSink);
jcommander.addCommand("restart", restartSink);
jcommander.addCommand("localrun", localSinkRunner);
jcommander.addCommand("available-sinks", new ListBuiltInSinks());
@@ -575,7 +578,7 @@ public class CmdSinks extends CmdBase {
}
}
- @Parameters(commandDescription = "Temporary stops sink instance. (If
worker restarts then it reassigns and starts sink again")
+ @Parameters(commandDescription = "Stops sink instance")
class StopSink extends SinkCommand {
@Parameter(names = "--instance-id", description = "The sink instanceId
(stop all instances if instance-id is not provided")
@@ -592,7 +595,28 @@ public class CmdSinks extends CmdBase {
} else {
admin.sink().stopSink(tenant, namespace, sinkName);
}
- System.out.println("Restarted successfully");
+ System.out.println("Stopped successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Starts sink instance")
+ class StartSink extends SinkCommand {
+
+ @Parameter(names = "--instance-id", description = "The sink instanceId
(start all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.sink().startSink(tenant, namespace, sinkName,
Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.sink().startSink(tenant, namespace, sinkName);
+ }
+ System.out.println("Started successfully");
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index c61b69a..c334380 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -68,6 +68,7 @@ public class CmdSources extends CmdBase {
private final UpdateSource updateSource;
private final RestartSource restartSource;
private final StopSource stopSource;
+ private final StartSource startSource;
private final LocalSourceRunner localSourceRunner;
public CmdSources(PulsarAdmin admin) {
@@ -80,6 +81,7 @@ public class CmdSources extends CmdBase {
getSourceStatus = new GetSourceStatus();
restartSource = new RestartSource();
stopSource = new StopSource();
+ startSource = new StartSource();
localSourceRunner = new LocalSourceRunner();
jcommander.addCommand("create", createSource);
@@ -90,6 +92,7 @@ public class CmdSources extends CmdBase {
jcommander.addCommand("status", getSourceStatus, "getstatus");
jcommander.addCommand("list", listSources);
jcommander.addCommand("stop", stopSource);
+ jcommander.addCommand("start", startSource);
jcommander.addCommand("restart", restartSource);
jcommander.addCommand("localrun", localSourceRunner);
jcommander.addCommand("available-sources", new ListBuiltInSources());
@@ -529,7 +532,7 @@ public class CmdSources extends CmdBase {
}
}
- @Parameters(commandDescription = "Temporary stops source instance. (If
worker restarts then it reassigns and starts source again")
+ @Parameters(commandDescription = "Stop source instance")
class StopSource extends SourceCommand {
@Parameter(names = "--instance-id", description = "The source
instanceId (stop all instances if instance-id is not provided")
@@ -546,7 +549,28 @@ public class CmdSources extends CmdBase {
} else {
admin.source().stopSource(tenant, namespace, sourceName);
}
- System.out.println("Restarted successfully");
+ System.out.println("Stopped successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Start source instance")
+ class StartSource extends SourceCommand {
+
+ @Parameter(names = "--instance-id", description = "The source
instanceId (start all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.source().startSource(tenant, namespace, sourceName,
Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.source().startSource(tenant, namespace, sourceName);
+ }
+ System.out.println("Started successfully");
}
}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index cb5021b..74457e3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -127,11 +127,16 @@ message PackageLocationMetaData {
string originalFileName = 2;
}
+enum FunctionState {
+ RUNNING = 0;
+ STOPPED = 1;
+}
message FunctionMetaData {
FunctionDetails functionDetails = 1;
PackageLocationMetaData packageLocation = 2;
uint64 version = 3;
uint64 createTime = 4;
+ map<int32, FunctionState> instanceStates = 5;
}
message Instance {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 920063e..832ed5d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -224,6 +224,42 @@ public class FunctionMetaDataManager implements
AutoCloseable {
}
/**
+ * Sends a start/stop function request to the FMT (Function Metadata
Topic) for a function
+ * @param tenant the tenant the function that needs to be deregistered
belongs to
+ * @param namespace the namespace the function that needs to be
deregistered belongs to
+ * @param functionName the name of the function
+ * @param instanceId the instanceId of the function, -1 if for all
instances
+ * @param start do we need to start or stop
+ * @return a completable future of when the start/stop has been applied
+ */
+ public synchronized CompletableFuture<RequestResult>
changeFunctionInstanceStatus(String tenant, String namespace, String
functionName,
+
Integer instanceId, boolean start) {
+ FunctionMetaData functionMetaData =
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
+
+ FunctionMetaData.Builder builder = functionMetaData.toBuilder()
+ .setVersion(functionMetaData.getVersion() + 1);
+ if (builder.getInstanceStatesMap() == null ||
builder.getInstanceStatesMap().isEmpty()) {
+ for (int i = 0; i <
functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+ builder.putInstanceStates(i, Function.FunctionState.RUNNING);
+ }
+ }
+ Function.FunctionState state = start ? Function.FunctionState.RUNNING
: Function.FunctionState.STOPPED;
+ if (instanceId < 0) {
+ for (int i = 0; i <
functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+ builder.putInstanceStates(i, state);
+ }
+ } else {
+ builder.putInstanceStates(instanceId, state);
+ }
+ FunctionMetaData newFunctionMetaData = builder.build();
+
+ Request.ServiceRequest updateRequest =
ServiceRequestUtils.getUpdateRequest(
+ this.workerConfig.getWorkerId(), newFunctionMetaData);
+
+ return submit(updateRequest);
+ }
+
+ /**
* Processes a request received from the FMT (Function Metadata Topic)
* @param messageId The message id of the request
* @param serviceRequest The request
@@ -421,6 +457,29 @@ public class FunctionMetaDataManager implements
AutoCloseable {
}
}
+ public boolean canChangeState(FunctionMetaData functionMetaData, int
instanceId, Function.FunctionState newState) {
+ if (instanceId >=
functionMetaData.getFunctionDetails().getParallelism()) {
+ return false;
+ }
+ if (functionMetaData.getInstanceStatesMap() == null ||
functionMetaData.getInstanceStatesMap().isEmpty()) {
+ // This means that all instances of the functions are running
+ return newState == Function.FunctionState.STOPPED;
+ }
+ if (instanceId >= 0) {
+ if
(functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
+ return functionMetaData.getInstanceStatesMap().get(instanceId)
!= newState;
+ } else {
+ return false;
+ }
+ } else {
+ // want to change state for all instances
+ for (Function.FunctionState state :
functionMetaData.getInstanceStatesMap().values()) {
+ if (state != newState) return true;
+ }
+ return false;
+ }
+ }
+
private ServiceRequestManager getServiceRequestManager(PulsarClient
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
return new
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index a5a6aa2..94009a6 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -201,7 +201,9 @@ public class FunctionRuntimeManager implements
AutoCloseable{
Map<String, Assignment> assignmentMap =
workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
for (Assignment assignment : assignmentMap.values()) {
- startFunctionInstance(assignment);
+ if (needsStart(assignment)) {
+ startFunctionInstance(assignment);
+ }
}
}
// start assignment tailer
@@ -304,8 +306,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
}
- public Response stopFunctionInstance(String tenant, String namespace,
String functionName, int instanceId,
- boolean restart, URI uri) throws Exception {
+ public Response restartFunctionInstance(String tenant, String namespace,
String functionName, int instanceId,
+ URI uri) throws Exception {
if (runtimeFactory.externallyManaged()) {
return
Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Externally managed schedulers can't
do per instance stop")).build();
@@ -321,7 +323,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
final String workerId = this.workerConfig.getWorkerId();
if (assignedWorkerId.equals(workerId)) {
-
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
restart);
+
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
true);
return Response.status(Status.OK).build();
} else {
// query other worker
@@ -346,7 +348,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
}
- public Response stopFunctionInstances(String tenant, String namespace,
String functionName, boolean restart)
+ public Response restartFunctionInstances(String tenant, String namespace,
String functionName)
throws Exception {
final String fullFunctionName = String.format("%s/%s/%s", tenant,
namespace, functionName);
Collection<Assignment> assignments =
this.findFunctionAssignments(tenant, namespace, functionName);
@@ -361,7 +363,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
final String workerId = this.workerConfig.getWorkerId();
String fullyQualifiedInstanceId =
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
- stopFunction(fullyQualifiedInstanceId, restart);
+ stopFunction(fullyQualifiedInstanceId, true);
} else {
List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
@@ -377,11 +379,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
return
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build();
}
- if (restart) {
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
- } else {
- this.functionAdmin.functions().stopFunction(tenant,
namespace, functionName);
- }
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
}
} else {
for (Assignment assignment : assignments) {
@@ -389,7 +387,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
final String workerId = this.workerConfig.getWorkerId();
String fullyQualifiedInstanceId =
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
- stopFunction(fullyQualifiedInstanceId, restart);
+ stopFunction(fullyQualifiedInstanceId, true);
} else {
List<WorkerInfo> workerInfoList =
this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
@@ -404,13 +402,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
continue;
}
- if (restart) {
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
- assignment.getInstance().getInstanceId());
- } else {
- this.functionAdmin.functions().stopFunction(tenant,
namespace, functionName,
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
assignment.getInstance().getInstanceId());
- }
}
}
}
@@ -619,7 +612,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
this.insertStopAction(functionRuntimeInfo);
}
// still assigned to me, need to restart
- if
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
+ if
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) &&
needsStart(assignment)) {
//start again
FunctionRuntimeInfo newFunctionRuntimeInfo = new
FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
@@ -687,7 +680,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
this.setAssignment(assignment);
//Assigned to me
- if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
+ if (assignment.getWorkerId().equals(workerConfig.getWorkerId()) &&
needsStart(assignment)) {
startFunctionInstance(assignment);
}
}
@@ -819,4 +812,27 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String
fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}
+
+ private boolean needsStart(Assignment assignment) {
+ boolean toStart = false;
+ Function.FunctionMetaData functionMetaData =
assignment.getInstance().getFunctionMetaData();
+ if (functionMetaData.getInstanceStatesMap() == null ||
functionMetaData.getInstanceStatesMap().isEmpty()) {
+ toStart = true;
+ } else {
+ if (assignment.getInstance().getInstanceId() < 0) {
+ // for externally managed functions, insert the start only if
there is atleast one
+ // instance that needs to be started
+ for (Function.FunctionState state :
functionMetaData.getInstanceStatesMap().values()) {
+ if (state == Function.FunctionState.RUNNING) {
+ toStart = true;
+ }
+ }
+ } else {
+ if
(functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(),
Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
+ toStart = true;
+ }
+ }
+ }
+ return toStart;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 71825a6..cde2aae 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -167,8 +167,7 @@ public abstract class ComponentImpl {
FunctionRuntimeInfo functionRuntimeInfo =
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
if (functionRuntimeInfo == null) {
- log.error("{} in get {} Status does not exist @
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
- throw new RestException(Status.NOT_FOUND,
String.format("%s %s doesn't exist", componentType, name));
+ return notRunning(assignedWorkerId, "");
}
RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
@@ -181,7 +180,8 @@ public abstract class ComponentImpl {
throw new RuntimeException(e);
}
} else {
- return notRunning(assignedWorkerId,
functionRuntimeInfo.getStartupException().getMessage());
+ String message = functionRuntimeInfo.getStartupException()
!= null ? functionRuntimeInfo.getStartupException().getMessage() : "";
+ return notRunning(assignedWorkerId, message);
}
} else {
// query other worker
@@ -703,7 +703,62 @@ public abstract class ComponentImpl {
final String componentName,
final String instanceId,
final URI uri) {
- stopFunctionInstance(tenant, namespace, componentName, instanceId,
false, uri);
+ changeFunctionInstanceStatus(tenant, namespace, componentName,
instanceId, false, uri);
+ }
+
+ public void startFunctionInstance(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final URI uri) {
+ changeFunctionInstanceStatus(tenant, namespace, componentName,
instanceId, true, uri);
+ }
+
+ public void changeFunctionInstanceStatus(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final boolean start,
+ final URI uri) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionInstanceRequestParams(tenant, namespace,
componentName, componentType, instanceId);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid start/stop {} request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
+ throw new RestException(Status.BAD_REQUEST, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
+ log.error("{} does not exist @ /{}/{}/{}", componentType, tenant,
namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ if (!functionMetaDataManager.canChangeState(functionMetaData,
Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING :
Function.FunctionState.STOPPED)) {
+ log.error("Operation not permitted on {}/{}/{}", tenant,
namespace, componentName);
+ throw new RestException(Status.BAD_REQUEST,
String.format("Operation not permitted"));
+ }
+
+ try {
+ functionMetaDataManager.changeFunctionInstanceStatus(tenant,
namespace, componentName,
+ Integer.parseInt(instanceId), start);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("Failed to start/stop {}: {}/{}/{}/{}", componentType,
tenant, namespace, componentName, instanceId, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
}
public void restartFunctionInstance(final String tenant,
@@ -711,16 +766,6 @@ public abstract class ComponentImpl {
final String componentName,
final String instanceId,
final URI uri) {
- stopFunctionInstance(tenant, namespace, componentName, instanceId,
true, uri);
- }
-
- public void stopFunctionInstance(final String tenant,
- final String namespace,
- final String componentName,
- final String instanceId,
- final boolean restart,
- final URI uri) {
-
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
@@ -747,8 +792,8 @@ public abstract class ComponentImpl {
FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
try {
- functionRuntimeManager.stopFunctionInstance(tenant, namespace,
componentName,
- Integer.parseInt(instanceId), restart, uri);
+ functionRuntimeManager.restartFunctionInstance(tenant, namespace,
componentName,
+ Integer.parseInt(instanceId), uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
@@ -760,20 +805,62 @@ public abstract class ComponentImpl {
public void stopFunctionInstances(final String tenant,
final String namespace,
final String componentName) {
- stopFunctionInstances(tenant, namespace, componentName, false);
+ changeFunctionStatusAllInstances(tenant, namespace, componentName,
false);
}
- public void restartFunctionInstances(final String tenant,
- final String namespace,
- final String componentName) {
- stopFunctionInstances(tenant, namespace, componentName, true);
+ public void startFunctionInstances(final String tenant,
+ final String namespace,
+ final String componentName) {
+ changeFunctionStatusAllInstances(tenant, namespace, componentName,
true);
}
- public void stopFunctionInstances(final String tenant,
- final String namespace,
- final String componentName,
- final boolean restart) {
+ public void changeFunctionStatusAllInstances(final String tenant,
+ final String namespace,
+ final String componentName,
+ final boolean start) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionRequestParams(tenant, namespace, componentName,
componentType);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid start/stop {} request @ /{}/{}/{}",
componentType, tenant, namespace, componentName, e);
+ throw new RestException(Status.BAD_REQUEST, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
+ log.error("{} in stopFunctionInstances does not exist @
/{}/{}/{}", componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, componentType);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", componentType, componentName));
+ }
+
+ if (!functionMetaDataManager.canChangeState(functionMetaData, -1,
start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+ log.error("Operation not permitted on {}/{}/{}", tenant,
namespace, componentName);
+ throw new RestException(Status.BAD_REQUEST,
String.format("Operation not permitted"));
+ }
+ try {
+ functionMetaDataManager.changeFunctionInstanceStatus(tenant,
namespace, componentName, -1, start);
+ } catch (WebApplicationException we) {
+ throw we;
+ } catch (Exception e) {
+ log.error("Failed to start/stop {}: {}/{}/{}", componentType,
tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+ }
+
+ public void restartFunctionInstances(final String tenant,
+ final String namespace,
+ final String componentName) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
@@ -800,7 +887,7 @@ public abstract class ComponentImpl {
FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
try {
- functionRuntimeManager.stopFunctionInstances(tenant, namespace,
componentName, restart);
+ functionRuntimeManager.restartFunctionInstances(tenant, namespace,
componentName);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 2eebf62..4d27134 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -260,6 +260,37 @@ public class FunctionApiV3Resource extends
FunctionApiResource {
}
@POST
+ @ApiOperation(value = "Start function instance", response = Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String
functionName,
+ final @PathParam("instanceId") String
instanceId) {
+ functions.startFunctionInstance(tenant, namespace, functionName,
instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all function instances", response =
Void.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String
functionName) {
+ functions.startFunctionInstances(tenant, namespace, functionName);
+ }
+
+ @POST
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream
uploadedInputStream,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index 6a8f25e..ee7d1a4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -195,6 +195,33 @@ public class SinkApiV3Resource extends FunctionApiResource
{
sink.stopFunctionInstances(tenant, namespace, sinkName);
}
+ @POST
+ @ApiOperation(value = "Start sink instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ sink.startFunctionInstance(tenant, namespace, sinkName, instanceId,
this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all sink instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) {
+ sink.startFunctionInstances(tenant, namespace, sinkName);
+ }
+
@GET
@Path("/builtinsinks")
public List<ConnectorDefinition> getSinkList() {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index 8675cc5..c532e3a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -198,6 +198,33 @@ public class SourceApiV3Resource extends
FunctionApiResource {
source.stopFunctionInstances(tenant, namespace, sourceName);
}
+ @POST
+ @ApiOperation(value = "Start source instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ source.startFunctionInstance(tenant, namespace, sourceName,
instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Start all source instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid
request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/start")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void startSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) {
+ source.startFunctionInstances(tenant, namespace, sourceName);
+ }
+
@GET
@Path("/builtinsources")
public List<ConnectorDefinition> getSourceList() {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 7fc7c7f..75e83bf 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -177,6 +177,68 @@ public class FunctionMetaDataManagerTest {
}
@Test
+ public void testStopFunction() throws PulsarClientException {
+
+ long version = 5;
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ FunctionMetaDataManager functionMetaDataManager = spy(
+ new FunctionMetaDataManager(workerConfig,
+ mock(SchedulerManager.class),
+ mockPulsarClient()));
+
+ Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new
HashMap<>();
+ Function.FunctionMetaData f1 =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+ functionMetaDataMap1.put("func-1", f1);
+
+ Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0,
Function.FunctionState.STOPPED));
+ Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0,
Function.FunctionState.RUNNING));
+ Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2,
Function.FunctionState.STOPPED));
+ Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2,
Function.FunctionState.RUNNING));
+
+ functionMetaDataManager.functionMetaDataMap.put("tenant-1", new
HashMap<>());
+
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1",
functionMetaDataMap1);
+
+
Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
+
+ functionMetaDataManager.changeFunctionInstanceStatus("tenant-1",
"namespace-1", "func-1", 0, false);
+
+ verify(functionMetaDataManager,
times(1)).submit(any(Request.ServiceRequest.class));
+ verify(functionMetaDataManager).submit(argThat(new
ArgumentMatcher<Request.ServiceRequest>() {
+ @Override
+ public boolean matches(Object o) {
+ if (o instanceof Request.ServiceRequest) {
+ Request.ServiceRequest serviceRequest =
(Request.ServiceRequest) o;
+ if
(!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
+ if (!serviceRequest.getServiceRequestType().equals(
+ Request.ServiceRequest.ServiceRequestType.UPDATE))
{
+ return false;
+ }
+ if
(!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails()))
{
+ return false;
+ }
+ if (serviceRequest.getFunctionMetaData().getVersion() !=
(version + 1)) {
+ return false;
+ }
+ Map<Integer, Function.FunctionState> stateMap =
serviceRequest.getFunctionMetaData().getInstanceStatesMap();
+ if (stateMap == null || stateMap.isEmpty()) {
+ return false;
+ }
+ if (stateMap.get(1) != Function.FunctionState.RUNNING) {
+ return false;
+ }
+ if (stateMap.get(0) != Function.FunctionState.STOPPED) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+ }));
+ }
+
+ @Test
public void deregisterFunction() throws PulsarClientException {
long version = 5;
WorkerConfig workerConfig = new WorkerConfig();
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 17d6642..3ba667b 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -371,6 +371,59 @@ public class FunctionRuntimeManagerTest {
.get("worker-1").get("test-tenant/test-namespace/func-1:0"),
assignment1);
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"),
assignment3);
+
+ reset(functionRuntimeManager);
+ functionRuntimeManager.actionQueue.clear();
+
+ // add a stop
+ Function.FunctionMetaData.Builder function2StoppedBldr =
function2.toBuilder();
+ function2StoppedBldr.putInstanceStates(0,
Function.FunctionState.STOPPED);
+ Function.FunctionMetaData function2Stopped =
function2StoppedBldr.build();
+
+ Function.Assignment assignment4 = Function.Assignment.newBuilder()
+ .setWorkerId("worker-1")
+ .setInstance(Function.Instance.newBuilder()
+
.setFunctionMetaData(function2Stopped).setInstanceId(0).build())
+ .build();
+
+ functionRuntimeManager.processAssignment(assignment4);
+
+ verify(functionRuntimeManager,
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+ // make sure terminate is not called since this is a update operation
+ verify(functionRuntimeManager,
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+
+ verify(functionRuntimeManager).insertStopAction(argThat(new
ArgumentMatcher<FunctionRuntimeInfo>() {
+ @Override
+ public boolean matches(Object o) {
+ if (o instanceof FunctionRuntimeInfo) {
+ FunctionRuntimeInfo functionRuntimeInfo =
(FunctionRuntimeInfo) o;
+
+ if
(!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2))
{
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+ }));
+
+ verify(functionRuntimeManager,
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
+
+ Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
+ Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
+ new FunctionAction()
+ .setAction(FunctionAction.Action.STOP)
+ .setFunctionRuntimeInfo(new
FunctionRuntimeInfo().setFunctionInstance(
+
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
+ .build()))));
+
+
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
+
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
+ Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
+ .get("worker-1").get("test-tenant/test-namespace/func-1:0"),
assignment1);
+ Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
+ .get("worker-1").get("test-tenant/test-namespace/func-2:0"),
assignment4);
+
}
@Test