This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 31edf5a  Support heartbeat function for worker (#2424)
31edf5a is described below

commit 31edf5aa74f3c4005a228ef463df05ecb2cee287
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Aug 23 11:20:15 2018 -0700

    Support heartbeat function for worker (#2424)
---
 .../pulsar/functions/worker/WorkerConfig.java      |  2 +-
 .../worker/scheduler/RoundRobinScheduler.java      | 17 ++++++-
 .../functions/worker/SchedulerManagerTest.java     | 57 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 38ef5d3..0f695a9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -143,7 +143,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
 
     public String getWorkerId() {
         if (StringUtils.isBlank(this.workerId)) {
-            this.workerId = getWorkerHostname();
+            this.workerId = String.format("%s-%s", this.getWorkerHostname(), 
this.getWorkerPort());
         }
         return this.workerId;
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 4f9ad62..58c1a9a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.scheduler;
 
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
 import java.util.HashMap;
@@ -29,6 +30,9 @@ import java.util.stream.Collectors;
 
 public class RoundRobinScheduler implements IScheduler {
 
+    public static final String HEARTBEAT_TENANT = "pulsar-function";
+    public static final String HEARTBEAT_NAMESPACE = "heartbeat";
+    
     @Override
     public List<Assignment> schedule(List<Instance> 
unassignedFunctionInstances, List<Assignment>
             currentAssignments, List<String> workers) {
@@ -44,7 +48,8 @@ public class RoundRobinScheduler implements IScheduler {
         }
 
         for (Instance unassignedFunctionInstance : 
unassignedFunctionInstances) {
-            String workerId = findNextWorker(workerIdToAssignment);
+            String heartBeatWorkerId = 
checkHeartBeatFunction(unassignedFunctionInstance);
+            String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : 
findNextWorker(workerIdToAssignment);
             Assignment newAssignment = 
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
                     .setWorkerId(workerId).build();
             workerIdToAssignment.get(workerId).add(newAssignment);
@@ -57,6 +62,16 @@ public class RoundRobinScheduler implements IScheduler {
         return assignments;
     }
 
+    private static String checkHeartBeatFunction(Instance funInstance) {
+        if (funInstance.getFunctionMetaData() != null
+                && funInstance.getFunctionMetaData().getFunctionDetails() != 
null) {
+            FunctionDetails funDetails = 
funInstance.getFunctionMetaData().getFunctionDetails();
+            return HEARTBEAT_TENANT.equals(funDetails.getTenant())
+                    && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? 
funDetails.getName() : null;
+        }
+        return null;
+    }
+
     private String findNextWorker(Map<String, List<Assignment>> 
workerIdToAssignment) {
         String targetWorkerId = null;
         int least = Integer.MAX_VALUE;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 97e9c36..19977bd 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
@@ -555,6 +556,62 @@ public class SchedulerManagerTest {
     }
 
     @Test
+    public void testHeartbeatFunction() throws Exception {
+        List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
+        final long version = 5;
+        final String workerId1 = "host-workerId-1";
+        final String workerId2 = "host-workerId-2";
+        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
+                
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1)
+                        .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
+                        
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+                .setVersion(version).build();
+
+        Function.FunctionMetaData function2 = 
Function.FunctionMetaData.newBuilder()
+                
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2)
+                        .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
+                        
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+                .setVersion(version).build();
+        functionMetaDataList.add(function1);
+        functionMetaDataList.add(function2);
+        
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
+
+        Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
+        Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
+
+        currentAssignments.put("worker-1", assignmentEntry1);
+        
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+
+        // set version
+        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+
+        List<WorkerInfo> workerInfoList = new LinkedList<>();
+        workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000));
+        workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000));
+        
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
+
+        // i am leader
+        doReturn(true).when(membershipManager).isLeader();
+
+        callSchedule();
+
+        List<Invocation> invocations = getMethodInvocationDetails(producer,
+                Producer.class.getMethod("sendAsync", Object.class));
+        Assert.assertEquals(invocations.size(), 1);
+
+        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
+        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+
+        List<Assignment> assignmentList = 
assignmentsUpdate.getAssignmentsList();
+        Assert.assertEquals(assignmentList.size(), 2);
+        for (Assignment assignment : assignmentList) {
+            String functionName = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
+            String assignedWorkerId = assignment.getWorkerId();
+            Assert.assertEquals(functionName, assignedWorkerId);
+        }
+    }
+    
+    @Test
     public void testUpdate() throws Exception {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;

Reply via email to