This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a237a  [Functions] Abstract repetitive code and add unit tests. 
#9502 (#9671)
c7a237a is described below

commit c7a237ac508c72f80a3706002d3307da89292a0d
Author: guodongyang <[email protected]>
AuthorDate: Sat Mar 13 11:19:31 2021 +0800

    [Functions] Abstract repetitive code and add unit tests. #9502 (#9671)
    
    ### Motivation
    This PR is related to #9502 and [PR 
#9519](https://github.com/apache/pulsar/pull/9519).
    In [PR #9519 ](https://github.com/apache/pulsar/pull/9519), another 
contributer has fixed the problem described in #9502, but there are still some 
areas that can be optimized.
    
    ### Modifications
    - Abstract a method and then reuse some code.
    - Adding a unit test for the `restartFunctionInstances` method
---
 .../functions/worker/FunctionRuntimeManager.java   |  54 ++++---
 .../worker/FunctionRuntimeManagerTest.java         | 176 +++++++++++++++++++++
 2 files changed, 208 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 045d43a..222ebb7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -421,15 +421,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                             .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build());
                 }
 
-                ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
-
-                if (ComponentType.SOURCE == componentType) {
-                    this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName);
-                } else if (ComponentType.SINK == componentType) {
-                    this.functionAdmin.sinks().restartSink(tenant, namespace, 
functionName);
-                } else {
-                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
-                }
+                restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, 
functionName, true);
             }
         } else {
             for (Assignment assignment : assignments) {
@@ -452,19 +444,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         }
                         continue;
                     }
-
-                    ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
-
-                    if (ComponentType.SOURCE == componentType) {
-                        this.functionAdmin.sources().restartSource(tenant, 
namespace, functionName,
-                            assignment.getInstance().getInstanceId());
-                    } else if (ComponentType.SINK == componentType) {
-                        this.functionAdmin.sinks().restartSink(tenant, 
namespace, functionName,
-                            assignment.getInstance().getInstanceId());
-                    } else {
-                        this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
-                            assignment.getInstance().getInstanceId());
-                    }
+                    restartFunctionUsingPulsarAdmin(assignment, tenant, 
namespace, functionName, false);
                 }
             }
         }
@@ -472,6 +452,36 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     /**
+     * Restart the entire function or restart a single instance of the function
+     */
+    private void restartFunctionUsingPulsarAdmin(Assignment assignment, String 
tenant, String namespace,
+             String functionName, boolean restartEntireFunction) throws 
PulsarAdminException {
+        ComponentType componentType = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
+        if (restartEntireFunction) {
+            if (ComponentType.SOURCE == componentType) {
+                this.functionAdmin.sources().restartSource(tenant, namespace, 
functionName);
+            } else if (ComponentType.SINK == componentType) {
+                this.functionAdmin.sinks().restartSink(tenant, namespace, 
functionName);
+            } else {
+                this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
+            }
+        } else {
+            // only restart single instance
+            if (ComponentType.SOURCE == componentType) {
+                this.functionAdmin.sources().restartSource(tenant, namespace, 
functionName,
+                        assignment.getInstance().getInstanceId());
+            } else if (ComponentType.SINK == componentType) {
+                this.functionAdmin.sinks().restartSink(tenant, namespace, 
functionName,
+                        assignment.getInstance().getInstanceId());
+            } else {
+                this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
+                        assignment.getInstance().getInstanceId());
+            }
+        }
+
+    }
+
+    /**
      * It stops all functions instances owned by current worker
      * @throws Exception
      */
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4214364..1739c91 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -38,10 +38,14 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.ImmutableList;
 import io.netty.buffer.Unpooled;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sinks;
+import org.apache.pulsar.client.admin.Sources;
+import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -51,6 +55,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -62,6 +67,7 @@ import 
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.mockito.ArgumentMatchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
@@ -1023,4 +1029,174 @@ public class FunctionRuntimeManagerTest {
         ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) 
functionRuntimeManager.getRuntimeFactory();
         assertEquals(threadRuntimeFactory.getThreadGroup().getName(), 
"threadGroupName");
     }
+
+    @Test
+    public void testThreadFunctionInstancesRestart() throws Exception {
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new 
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
+        // mock pulsarAdmin sources sinks functions
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        Sources sources = mock(Sources.class);
+        doNothing().when(sources).restartSource(ArgumentMatchers.any(), 
ArgumentMatchers.any(), ArgumentMatchers.any());
+        doReturn(sources).when(pulsarAdmin).sources();
+        Sinks sinks = mock(Sinks.class);
+        doReturn(sinks).when(pulsarAdmin).sinks();
+        Functions functions = mock(Functions.class);
+        doNothing().when(functions).restartFunction(ArgumentMatchers.any(), 
ArgumentMatchers.any(), ArgumentMatchers.any());
+        doReturn(functions).when(pulsarAdmin).functions();
+
+        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
+        mockStatic(RuntimeFactory.class);
+        List<WorkerInfo> workerInfos = new LinkedList<>();
+        workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
+        workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
+        
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
+                .thenReturn(new ThreadRuntimeFactory());
+        MembershipManager membershipManager = mock(MembershipManager.class);
+        doReturn(workerInfos).when(membershipManager).getCurrentMembership();
+
+        // build three types of FunctionMetaData
+        Function.FunctionMetaData function = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
+        Function.FunctionMetaData source = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
+        Function.FunctionMetaData sink = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+
+        FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new 
FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                membershipManager,
+                mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
+                mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
+                mock(ErrorNotifier.class)));
+
+        // verify restart function/source/sink using different assignment
+        verifyRestart(functionRuntimeManager, function, "worker-1", false, 
false);
+        verifyRestart(functionRuntimeManager, function, "worker-2", false, 
true);
+        verifyRestart(functionRuntimeManager, source, "worker-1", false, 
false);
+        verifyRestart(functionRuntimeManager, source, "worker-2", false, true);
+        verifyRestart(functionRuntimeManager, sink, "worker-1", false, false);
+        verifyRestart(functionRuntimeManager, sink, "worker-2", false, true);
+    }
+
+    @Test
+    public void testKubernetesFunctionInstancesRestart() throws Exception {
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+        WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
+                = new WorkerConfig.KubernetesContainerFactory();
+        workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
+        KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new 
KubernetesRuntimeFactory());
+        doNothing().when(mockedKubernetesRuntimeFactory).initialize(
+                any(WorkerConfig.class),
+                any(AuthenticationConfig.class),
+                any(SecretsProviderConfigurator.class),
+                any(),
+                any(),
+                any()
+        );
+        doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
+        
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
+        PowerMockito.whenNew(KubernetesRuntimeFactory.class)
+                .withNoArguments().thenReturn(mockedKubernetesRuntimeFactory);
+
+        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
+        // mock pulsarAdmin sources sinks functions
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        Sources sources = mock(Sources.class);
+        doNothing().when(sources).restartSource(ArgumentMatchers.any(), 
ArgumentMatchers.any(), ArgumentMatchers.any());
+        doReturn(sources).when(pulsarAdmin).sources();
+        Sinks sinks = mock(Sinks.class);
+        doReturn(sinks).when(pulsarAdmin).sinks();
+        Functions functions = mock(Functions.class);
+        doNothing().when(functions).restartFunction(ArgumentMatchers.any(), 
ArgumentMatchers.any(), ArgumentMatchers.any());
+        doReturn(functions).when(pulsarAdmin).functions();
+
+        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
+        mockStatic(RuntimeFactory.class);
+        List<WorkerInfo> workerInfos = new LinkedList<>();
+        workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
+        workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
+        
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
+                .thenReturn(new ThreadRuntimeFactory());
+        MembershipManager membershipManager = mock(MembershipManager.class);
+        doReturn(workerInfos).when(membershipManager).getCurrentMembership();
+
+        // build three types of FunctionMetaData
+        Function.FunctionMetaData function = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
+        Function.FunctionMetaData source = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
+        Function.FunctionMetaData sink = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
+                        
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+
+        FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new 
FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                membershipManager,
+                mock(ConnectorsManager.class),
+                mock(FunctionsManager.class),
+                mock(FunctionMetaDataManager.class),
+                mock(WorkerStatsManager.class),
+                mock(ErrorNotifier.class)));
+
+        // verify restart function/source/sink using different assignment
+        verifyRestart(functionRuntimeManager, function, "worker-1",true, 
false);
+        verifyRestart(functionRuntimeManager, function, "worker-2", true, 
true);
+        verifyRestart(functionRuntimeManager, source, "worker-1", true, false);
+        verifyRestart(functionRuntimeManager, source, "worker-2", true, true);
+        verifyRestart(functionRuntimeManager, sink, "worker-1", true, false);
+        verifyRestart(functionRuntimeManager, sink, "worker-2", true, true);
+    }
+
+    private static void verifyRestart(FunctionRuntimeManager 
functionRuntimeManager, Function.FunctionMetaData function,
+             String workerId, boolean externallyManaged, boolean 
expectRestartByPulsarAdmin) throws Exception {
+        Function.Assignment assignment = Function.Assignment.newBuilder()
+                .setWorkerId(workerId)
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function).setInstanceId(0).build())
+                .build();
+        doReturn(ImmutableList.of(assignment)).when(functionRuntimeManager)
+                .findFunctionAssignments("test-tenant", "test-namespace", 
"function");
+        functionRuntimeManager.restartFunctionInstances("test-tenant", 
"test-namespace", "function");
+        if (expectRestartByPulsarAdmin) {
+            PowerMockito.verifyPrivate(functionRuntimeManager)
+                .invoke("restartFunctionUsingPulsarAdmin", assignment, 
"test-tenant", "test-namespace", "function", externallyManaged);
+        } else {
+            PowerMockito.verifyPrivate(functionRuntimeManager)
+                .invoke("stopFunction", 
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
+        }
+    }
+
 }

Reply via email to