This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7a237a [Functions] Abstract repetitive code and add unit tests.
#9502 (#9671)
c7a237a is described below
commit c7a237ac508c72f80a3706002d3307da89292a0d
Author: guodongyang <[email protected]>
AuthorDate: Sat Mar 13 11:19:31 2021 +0800
[Functions] Abstract repetitive code and add unit tests. #9502 (#9671)
### Motivation
This PR is related to #9502 and [PR
#9519](https://github.com/apache/pulsar/pull/9519).
In [PR #9519 ](https://github.com/apache/pulsar/pull/9519), another
contributer has fixed the problem described in #9502, but there are still some
areas that can be optimized.
### Modifications
- Abstract a method and then reuse some code.
- Adding a unit test for the `restartFunctionInstances` method
---
.../functions/worker/FunctionRuntimeManager.java | 54 ++++---
.../worker/FunctionRuntimeManagerTest.java | 176 +++++++++++++++++++++
2 files changed, 208 insertions(+), 22 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 045d43a..222ebb7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -421,15 +421,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
.entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build());
}
- ComponentType componentType =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
-
- if (ComponentType.SOURCE == componentType) {
- this.functionAdmin.sources().restartSource(tenant,
namespace, functionName);
- } else if (ComponentType.SINK == componentType) {
- this.functionAdmin.sinks().restartSink(tenant, namespace,
functionName);
- } else {
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
- }
+ restartFunctionUsingPulsarAdmin(assignment, tenant, namespace,
functionName, true);
}
} else {
for (Assignment assignment : assignments) {
@@ -452,19 +444,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
continue;
}
-
- ComponentType componentType =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
-
- if (ComponentType.SOURCE == componentType) {
- this.functionAdmin.sources().restartSource(tenant,
namespace, functionName,
- assignment.getInstance().getInstanceId());
- } else if (ComponentType.SINK == componentType) {
- this.functionAdmin.sinks().restartSink(tenant,
namespace, functionName,
- assignment.getInstance().getInstanceId());
- } else {
- this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
- assignment.getInstance().getInstanceId());
- }
+ restartFunctionUsingPulsarAdmin(assignment, tenant,
namespace, functionName, false);
}
}
}
@@ -472,6 +452,36 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
/**
+ * Restart the entire function or restart a single instance of the function
+ */
+ private void restartFunctionUsingPulsarAdmin(Assignment assignment, String
tenant, String namespace,
+ String functionName, boolean restartEntireFunction) throws
PulsarAdminException {
+ ComponentType componentType =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+ if (restartEntireFunction) {
+ if (ComponentType.SOURCE == componentType) {
+ this.functionAdmin.sources().restartSource(tenant, namespace,
functionName);
+ } else if (ComponentType.SINK == componentType) {
+ this.functionAdmin.sinks().restartSink(tenant, namespace,
functionName);
+ } else {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName);
+ }
+ } else {
+ // only restart single instance
+ if (ComponentType.SOURCE == componentType) {
+ this.functionAdmin.sources().restartSource(tenant, namespace,
functionName,
+ assignment.getInstance().getInstanceId());
+ } else if (ComponentType.SINK == componentType) {
+ this.functionAdmin.sinks().restartSink(tenant, namespace,
functionName,
+ assignment.getInstance().getInstanceId());
+ } else {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ }
+ }
+
+ }
+
+ /**
* It stops all functions instances owned by current worker
* @throws Exception
*/
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4214364..1739c91 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -38,10 +38,14 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.ImmutableList;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sinks;
+import org.apache.pulsar.client.admin.Sources;
+import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
@@ -51,6 +55,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -62,6 +67,7 @@ import
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
@@ -1023,4 +1029,174 @@ public class FunctionRuntimeManagerTest {
ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory)
functionRuntimeManager.getRuntimeFactory();
assertEquals(threadRuntimeFactory.getThreadGroup().getName(),
"threadGroupName");
}
+
+ @Test
+ public void testThreadFunctionInstancesRestart() throws Exception {
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+
+ PulsarWorkerService workerService = mock(PulsarWorkerService.class);
+ // mock pulsarAdmin sources sinks functions
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ Sources sources = mock(Sources.class);
+ doNothing().when(sources).restartSource(ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
+ doReturn(sources).when(pulsarAdmin).sources();
+ Sinks sinks = mock(Sinks.class);
+ doReturn(sinks).when(pulsarAdmin).sinks();
+ Functions functions = mock(Functions.class);
+ doNothing().when(functions).restartFunction(ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
+ doReturn(functions).when(pulsarAdmin).functions();
+
+ doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
+ mockStatic(RuntimeFactory.class);
+ List<WorkerInfo> workerInfos = new LinkedList<>();
+ workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
+ workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
+
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
+ .thenReturn(new ThreadRuntimeFactory());
+ MembershipManager membershipManager = mock(MembershipManager.class);
+ doReturn(workerInfos).when(membershipManager).getCurrentMembership();
+
+ // build three types of FunctionMetaData
+ Function.FunctionMetaData function =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
+
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
+ Function.FunctionMetaData source =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
+
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
+ Function.FunctionMetaData sink =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
+
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+
+ FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new
FunctionRuntimeManager(
+ workerConfig,
+ workerService,
+ mock(Namespace.class),
+ membershipManager,
+ mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
+ mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
+ mock(ErrorNotifier.class)));
+
+ // verify restart function/source/sink using different assignment
+ verifyRestart(functionRuntimeManager, function, "worker-1", false,
false);
+ verifyRestart(functionRuntimeManager, function, "worker-2", false,
true);
+ verifyRestart(functionRuntimeManager, source, "worker-1", false,
false);
+ verifyRestart(functionRuntimeManager, source, "worker-2", false, true);
+ verifyRestart(functionRuntimeManager, sink, "worker-1", false, false);
+ verifyRestart(functionRuntimeManager, sink, "worker-2", false, true);
+ }
+
+ @Test
+ public void testKubernetesFunctionInstancesRestart() throws Exception {
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
+ = new WorkerConfig.KubernetesContainerFactory();
+ workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
+ KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new
KubernetesRuntimeFactory());
+ doNothing().when(mockedKubernetesRuntimeFactory).initialize(
+ any(WorkerConfig.class),
+ any(AuthenticationConfig.class),
+ any(SecretsProviderConfigurator.class),
+ any(),
+ any(),
+ any()
+ );
+ doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
+
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
+ PowerMockito.whenNew(KubernetesRuntimeFactory.class)
+ .withNoArguments().thenReturn(mockedKubernetesRuntimeFactory);
+
+ PulsarWorkerService workerService = mock(PulsarWorkerService.class);
+ // mock pulsarAdmin sources sinks functions
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ Sources sources = mock(Sources.class);
+ doNothing().when(sources).restartSource(ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
+ doReturn(sources).when(pulsarAdmin).sources();
+ Sinks sinks = mock(Sinks.class);
+ doReturn(sinks).when(pulsarAdmin).sinks();
+ Functions functions = mock(Functions.class);
+ doNothing().when(functions).restartFunction(ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
+ doReturn(functions).when(pulsarAdmin).functions();
+
+ doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
+ mockStatic(RuntimeFactory.class);
+ List<WorkerInfo> workerInfos = new LinkedList<>();
+ workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
+ workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
+
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
+ .thenReturn(new ThreadRuntimeFactory());
+ MembershipManager membershipManager = mock(MembershipManager.class);
+ doReturn(workerInfos).when(membershipManager).getCurrentMembership();
+
+ // build three types of FunctionMetaData
+ Function.FunctionMetaData function =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
+
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
+ Function.FunctionMetaData source =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
+
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
+ Function.FunctionMetaData sink =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder()
+
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
+
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+
+ FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new
FunctionRuntimeManager(
+ workerConfig,
+ workerService,
+ mock(Namespace.class),
+ membershipManager,
+ mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
+ mock(FunctionMetaDataManager.class),
+ mock(WorkerStatsManager.class),
+ mock(ErrorNotifier.class)));
+
+ // verify restart function/source/sink using different assignment
+ verifyRestart(functionRuntimeManager, function, "worker-1",true,
false);
+ verifyRestart(functionRuntimeManager, function, "worker-2", true,
true);
+ verifyRestart(functionRuntimeManager, source, "worker-1", true, false);
+ verifyRestart(functionRuntimeManager, source, "worker-2", true, true);
+ verifyRestart(functionRuntimeManager, sink, "worker-1", true, false);
+ verifyRestart(functionRuntimeManager, sink, "worker-2", true, true);
+ }
+
+ private static void verifyRestart(FunctionRuntimeManager
functionRuntimeManager, Function.FunctionMetaData function,
+ String workerId, boolean externallyManaged, boolean
expectRestartByPulsarAdmin) throws Exception {
+ Function.Assignment assignment = Function.Assignment.newBuilder()
+ .setWorkerId(workerId)
+ .setInstance(Function.Instance.newBuilder()
+
.setFunctionMetaData(function).setInstanceId(0).build())
+ .build();
+ doReturn(ImmutableList.of(assignment)).when(functionRuntimeManager)
+ .findFunctionAssignments("test-tenant", "test-namespace",
"function");
+ functionRuntimeManager.restartFunctionInstances("test-tenant",
"test-namespace", "function");
+ if (expectRestartByPulsarAdmin) {
+ PowerMockito.verifyPrivate(functionRuntimeManager)
+ .invoke("restartFunctionUsingPulsarAdmin", assignment,
"test-tenant", "test-namespace", "function", externallyManaged);
+ } else {
+ PowerMockito.verifyPrivate(functionRuntimeManager)
+ .invoke("stopFunction",
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
+ }
+ }
+
}