This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 31edf5a Support heartbeat function for worker (#2424) 31edf5a is described below commit 31edf5aa74f3c4005a228ef463df05ecb2cee287 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Aug 23 11:20:15 2018 -0700 Support heartbeat function for worker (#2424) --- .../pulsar/functions/worker/WorkerConfig.java | 2 +- .../worker/scheduler/RoundRobinScheduler.java | 17 ++++++- .../functions/worker/SchedulerManagerTest.java | 57 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 38ef5d3..0f695a9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -143,7 +143,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { - this.workerId = getWorkerHostname(); + this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); } return this.workerId; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 4f9ad62..58c1a9a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.scheduler; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; import java.util.HashMap; @@ -29,6 +30,9 @@ import java.util.stream.Collectors; public class RoundRobinScheduler implements IScheduler { + public static final String HEARTBEAT_TENANT = "pulsar-function"; + public static final String HEARTBEAT_NAMESPACE = "heartbeat"; + @Override public List<Assignment> schedule(List<Instance> unassignedFunctionInstances, List<Assignment> currentAssignments, List<String> workers) { @@ -44,7 +48,8 @@ public class RoundRobinScheduler implements IScheduler { } for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { - String workerId = findNextWorker(workerIdToAssignment); + String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); + String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); @@ -57,6 +62,16 @@ public class RoundRobinScheduler implements IScheduler { return assignments; } + private static String checkHeartBeatFunction(Instance funInstance) { + if (funInstance.getFunctionMetaData() != null + && funInstance.getFunctionMetaData().getFunctionDetails() != null) { + FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); + return HEARTBEAT_TENANT.equals(funDetails.getTenant()) + && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; + } + return null; + } + private String findNextWorker(Map<String, List<Assignment>> workerIdToAssignment) { String targetWorkerId = null; int least = Integer.MAX_VALUE; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 97e9c36..19977bd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; @@ -555,6 +556,62 @@ public class SchedulerManagerTest { } @Test + public void testHeartbeatFunction() throws Exception { + List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>(); + final long version = 5; + final String workerId1 = "host-workerId-1"; + final String workerId2 = "host-workerId-2"; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1) + .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) + .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setVersion(version).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2) + .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) + .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setVersion(version).build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); + Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + // set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + List<WorkerInfo> workerInfoList = new LinkedList<>(); + workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000)); + workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List<Invocation> invocations = getMethodInvocationDetails(producer, + Producer.class.getMethod("sendAsync", Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + List<Assignment> assignmentList = assignmentsUpdate.getAssignmentsList(); + Assert.assertEquals(assignmentList.size(), 2); + for (Assignment assignment : assignmentList) { + String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); + String assignedWorkerId = assignment.getWorkerId(); + Assert.assertEquals(functionName, assignedWorkerId); + } + } + + @Test public void testUpdate() throws Exception { List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>(); long version = 5;