This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 31edf5a Support heartbeat function for worker (#2424)
31edf5a is described below
commit 31edf5aa74f3c4005a228ef463df05ecb2cee287
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 23 11:20:15 2018 -0700
Support heartbeat function for worker (#2424)
---
.../pulsar/functions/worker/WorkerConfig.java | 2 +-
.../worker/scheduler/RoundRobinScheduler.java | 17 ++++++-
.../functions/worker/SchedulerManagerTest.java | 57 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 38ef5d3..0f695a9 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -143,7 +143,7 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
public String getWorkerId() {
if (StringUtils.isBlank(this.workerId)) {
- this.workerId = getWorkerHostname();
+ this.workerId = String.format("%s-%s", this.getWorkerHostname(),
this.getWorkerPort());
}
return this.workerId;
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 4f9ad62..58c1a9a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.scheduler;
import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Instance;
import java.util.HashMap;
@@ -29,6 +30,9 @@ import java.util.stream.Collectors;
public class RoundRobinScheduler implements IScheduler {
+ public static final String HEARTBEAT_TENANT = "pulsar-function";
+ public static final String HEARTBEAT_NAMESPACE = "heartbeat";
+
@Override
public List<Assignment> schedule(List<Instance>
unassignedFunctionInstances, List<Assignment>
currentAssignments, List<String> workers) {
@@ -44,7 +48,8 @@ public class RoundRobinScheduler implements IScheduler {
}
for (Instance unassignedFunctionInstance :
unassignedFunctionInstances) {
- String workerId = findNextWorker(workerIdToAssignment);
+ String heartBeatWorkerId =
checkHeartBeatFunction(unassignedFunctionInstance);
+ String workerId = heartBeatWorkerId != null ? heartBeatWorkerId :
findNextWorker(workerIdToAssignment);
Assignment newAssignment =
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
.setWorkerId(workerId).build();
workerIdToAssignment.get(workerId).add(newAssignment);
@@ -57,6 +62,16 @@ public class RoundRobinScheduler implements IScheduler {
return assignments;
}
+ private static String checkHeartBeatFunction(Instance funInstance) {
+ if (funInstance.getFunctionMetaData() != null
+ && funInstance.getFunctionMetaData().getFunctionDetails() !=
null) {
+ FunctionDetails funDetails =
funInstance.getFunctionMetaData().getFunctionDetails();
+ return HEARTBEAT_TENANT.equals(funDetails.getTenant())
+ && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ?
funDetails.getName() : null;
+ }
+ return null;
+ }
+
private String findNextWorker(Map<String, List<Assignment>>
workerIdToAssignment) {
String targetWorkerId = null;
int least = Integer.MAX_VALUE;
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 97e9c36..19977bd 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.Mockito;
@@ -555,6 +556,62 @@ public class SchedulerManagerTest {
}
@Test
+ public void testHeartbeatFunction() throws Exception {
+ List<Function.FunctionMetaData> functionMetaDataList = new
LinkedList<>();
+ final long version = 5;
+ final String workerId1 = "host-workerId-1";
+ final String workerId2 = "host-workerId-2";
+ Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder()
+
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1)
+ .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
+
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+ .setVersion(version).build();
+
+ Function.FunctionMetaData function2 =
Function.FunctionMetaData.newBuilder()
+
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2)
+ .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
+
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+ .setVersion(version).build();
+ functionMetaDataList.add(function1);
+ functionMetaDataList.add(function2);
+
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
+
+ Map<String, Map<String, Function.Assignment>> currentAssignments = new
HashMap<>();
+ Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
+
+ currentAssignments.put("worker-1", assignmentEntry1);
+
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+
+ // set version
+
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+
+ List<WorkerInfo> workerInfoList = new LinkedList<>();
+ workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000));
+ workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000));
+
doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
+
+ // i am leader
+ doReturn(true).when(membershipManager).isLeader();
+
+ callSchedule();
+
+ List<Invocation> invocations = getMethodInvocationDetails(producer,
+ Producer.class.getMethod("sendAsync", Object.class));
+ Assert.assertEquals(invocations.size(), 1);
+
+ byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
+ Request.AssignmentsUpdate assignmentsUpdate =
Request.AssignmentsUpdate.parseFrom(send);
+
+ List<Assignment> assignmentList =
assignmentsUpdate.getAssignmentsList();
+ Assert.assertEquals(assignmentList.size(), 2);
+ for (Assignment assignment : assignmentList) {
+ String functionName =
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
+ String assignedWorkerId = assignment.getWorkerId();
+ Assert.assertEquals(functionName, assignedWorkerId);
+ }
+ }
+
+ @Test
public void testUpdate() throws Exception {
List<Function.FunctionMetaData> functionMetaDataList = new
LinkedList<>();
long version = 5;