This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a1abe  add initialize routine to FunctionRuntimeManager (#2784)
a1a1abe is described below

commit a1a1abed57f108a1e4be3c76abe69e61da8fd619
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Oct 12 09:41:41 2018 -0500

    add initialize routine to FunctionRuntimeManager (#2784)
---
 .../functions/worker/FunctionAssignmentTailer.java |  16 +--
 .../functions/worker/FunctionMetaDataManager.java  |   4 +-
 .../functions/worker/FunctionRuntimeManager.java   | 120 +++++++++++++------
 .../pulsar/functions/worker/WorkerService.java     |   3 +
 .../worker/FunctionRuntimeManagerTest.java         | 132 ++++++++++++++++++++-
 5 files changed, 229 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3ad6c7c..3f8bec3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -36,13 +36,11 @@ public class FunctionAssignmentTailer
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager)
+    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager, Reader<byte[]> reader)
             throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
 
-        this.reader = 
functionRuntimeManager.getWorkerService().getClient().newReader()
-                
.topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
-                .startMessageId(MessageId.earliest).create();
+        this.reader = reader;
     }
 
     public void start() {
@@ -66,8 +64,7 @@ public class FunctionAssignmentTailer
         log.info("Stopped function state consumer");
     }
 
-    @Override
-    public void accept(Message<byte[]> msg) {
+    public void processAssignment(Message<byte[]> msg) {
         if(msg.getData()==null || (msg.getData().length==0)) {
             log.info("Received assignment delete: {}", msg.getKey());
             this.functionRuntimeManager.deleteAssignment(msg.getKey());
@@ -82,8 +79,13 @@ public class FunctionAssignmentTailer
                 throw new RuntimeException(e);
             }
             log.info("Received assignment update: {}", assignment);
-            this.functionRuntimeManager.processAssignment(assignment);    
+            this.functionRuntimeManager.processAssignment(assignment);
         }
+    }
+
+    @Override
+    public void accept(Message<byte[]> msg) {
+        processAssignment(msg);
         // receive next request
         receiveOne();
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 44ff807..4faed11 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -82,13 +82,11 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
 
     /**
      * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Restores from snapshot if one exists
-     * 2. Sends out initialize marker to FMT and consume messages until the 
initialize marker is consumed
+     * 1. Consume all existing function meta data upon start to establish 
existing state
      */
     public void initialize() {
         log.info("/** Initializing Function Metadata Manager **/");
         try {
-
             Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index aa843ca..ee57659 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,10 +36,13 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.Assignment;
@@ -65,6 +68,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     // All the runtime info related to functions executed by this worker
     // Fully Qualified InstanceId - > FunctionRuntimeInfo
+    // NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo 
methods to modify this data structure
+    // Since during initialization phase nothing should be modified
     @VisibleForTesting
     Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
ConcurrentHashMap<>();
 
@@ -75,7 +80,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     @VisibleForTesting
     LinkedBlockingQueue<FunctionAction> actionQueue;
 
-    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private FunctionAssignmentTailer functionAssignmentTailer;
 
     private FunctionActioner functionActioner;
 
@@ -89,14 +94,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     @Getter
     private WorkerService workerService;
 
+    @Setter
+    @Getter
+    boolean isInitializePhase = false;
+
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
-        this.functionAssignmentTailer = new FunctionAssignmentTailer(this);
-
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
                 
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
@@ -145,6 +152,41 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     /**
+     * Initializes the FunctionRuntimeManager.  Does the following:
+     * 1. Consume all existing assignments to establish existing/latest set of 
assignments
+     * 2. After current assignments are read, assignments belonging to this 
worker will be processed
+     */
+    public void initialize() {
+        log.info("/** Initializing Runtime Manager **/");
+        try {
+            Reader<byte[]> reader = 
this.getWorkerService().getClient().newReader()
+                    
.topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
+                    .startMessageId(MessageId.earliest).create();
+
+            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
+            // read all existing messages
+            this.setInitializePhase(true);
+            while (reader.hasMessageAvailable()) {
+                
this.functionAssignmentTailer.processAssignment(reader.readNext());
+            }
+            this.setInitializePhase(false);
+            // realize existing assignments
+            Map<String, Assignment> assignmentMap = 
workerIdToAssignments.get(this.workerConfig.getWorkerId());
+            if (assignmentMap != null) {
+                for (Assignment assignment : assignmentMap.values()) {
+                    startFunctionInstance(assignment);
+                }
+            }
+            // start assignment tailer
+            this.functionAssignmentTailer.start();
+
+        } catch (Exception e) {
+            log.error("Failed to initialize function runtime manager: ", 
e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
      * Starts the function runtime manager
      */
     public void start() {
@@ -623,27 +665,29 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     private void addAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
-
         //add new function
         this.setAssignment(assignment);
 
         //Assigned to me
         if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
-            if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
-                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
-                        .setFunctionInstance(assignment.getInstance()));
+            startFunctionInstance(assignment);
+        }
+    }
 
-            } else {
-                //Somehow this function is already started
-                log.warn("Function {} already running. Going to restart 
function.",
-                        
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-            }
-            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-            this.insertStartAction(functionRuntimeInfo);
+    private void startFunctionInstance(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
+            this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
+                    .setFunctionInstance(assignment.getInstance()));
+
+        } else {
+            //Somehow this function is already started
+            log.warn("Function {} already running. Going to restart function.",
+                    this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+            
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
         }
-        
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+        this.insertStartAction(functionRuntimeInfo);
     }
 
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
@@ -675,26 +719,29 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     @VisibleForTesting
     void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionAction functionAction = new FunctionAction();
-        functionAction.setAction(FunctionAction.Action.STOP);
-        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-        try {
-            actionQueue.put(functionAction);
-        } catch (InterruptedException ex) {
-            throw new RuntimeException("Interrupted while putting action");
+        if (!this.isInitializePhase) {
+            FunctionAction functionAction = new FunctionAction();
+            functionAction.setAction(FunctionAction.Action.STOP);
+            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+            try {
+                actionQueue.put(functionAction);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while putting action");
+            }
         }
-
     }
 
     @VisibleForTesting
     void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionAction functionAction = new FunctionAction();
-        functionAction.setAction(FunctionAction.Action.START);
-        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-        try {
-            actionQueue.put(functionAction);
-        } catch (InterruptedException ex) {
-            throw new RuntimeException("Interrupted while putting action");
+        if (!this.isInitializePhase) {
+            FunctionAction functionAction = new FunctionAction();
+            functionAction.setAction(FunctionAction.Action.START);
+            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+            try {
+                actionQueue.put(functionAction);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while putting action");
+            }
         }
     }
 
@@ -731,11 +778,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     }
 
     private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
-        this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
+        if (!this.isInitializePhase) {
+            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
+        }
     }
 
     private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, 
FunctionRuntimeInfo functionRuntimeInfo) {
-        this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
+        // Don't modify Function Runtime Infos when initializing
+        if (!this.isInitializePhase) {
+            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
+        }
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 488bcd7..2b9c632 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -150,6 +150,9 @@ public class WorkerService {
             this.functionRuntimeManager = new FunctionRuntimeManager(
                     this.workerConfig, this, this.dlogNamespace, 
this.membershipManager, connectorsManager);
 
+            // initialize function runtime manager
+            this.functionRuntimeManager.initialize();
+
             // Setting references to managers in scheduler
             
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
             
this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 5e1ed02..490be77 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -18,23 +18,31 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -46,7 +54,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@Slf4j
 public class FunctionRuntimeManagerTest {
 
     public static class TestSink implements MetricsSink {
@@ -384,4 +394,122 @@ public class FunctionRuntimeManagerTest {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
+
+    @Test
+    public void testRuntimeManagerInitialize() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        Function.Assignment assignment3 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        List<Message<byte[]>> messageList = new LinkedList<>();
+        Message message1 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                        new HashMap<>(), 
Unpooled.copiedBuffer(assignment1.toByteArray()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                new HashMap<>(), 
Unpooled.copiedBuffer(assignment2.toByteArray()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        // delete function2
+        Message message3 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
+
+        messageList.add(message1);
+        messageList.add(message2);
+        messageList.add(message3);
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+        Iterator<Message<byte[]>> it = messageList.iterator();
+
+        when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                return it.next();
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new 
Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return it.hasNext();
+            }
+        });
+
+
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
+        ));
+
+
+        functionRuntimeManager.initialize();
+
+        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
+        log.info("actionQueue: {}", functionRuntimeManager.actionQueue);
+        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
+
+        FunctionAction functionAction = 
functionRuntimeManager.actionQueue.poll();
+
+        // only actually start function1
+        Assert.assertEquals(functionAction.getAction(), 
FunctionAction.Action.START);
+        
Assert.assertEquals(functionAction.getFunctionRuntimeInfo().getFunctionInstance(),
 assignment1.getInstance());
+
+    }
 }

Reply via email to