This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b283ebd Fix: Function assignment can support large number of topics
(#2438)
b283ebd is described below
commit b283ebd3e51aa86f6b9f0a6607f430996f0032f7
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Aug 27 23:56:51 2018 -0700
Fix: Function assignment can support large number of topics (#2438)
---
conf/functions_worker.yml | 1 +
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 2 +
.../functions/worker/FunctionAssignmentTailer.java | 70 ++++++++++++++++------
.../functions/worker/FunctionRuntimeManager.java | 67 ++++++++++++++++++++-
.../pulsar/functions/worker/MembershipManager.java | 2 +
.../pulsar/functions/worker/SchedulerManager.java | 46 ++++++++++----
.../org/apache/pulsar/functions/worker/Worker.java | 59 ++++++++++--------
.../pulsar/functions/worker/WorkerConfig.java | 3 +-
.../worker/FunctionRuntimeManagerTest.java | 33 +++++++++-
.../functions/worker/MembershipManagerTest.java | 3 +-
.../functions/worker/SchedulerManagerTest.java | 8 +++
11 files changed, 227 insertions(+), 67 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb..497c414 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,7 @@ connectorsDirectory: ./connectors
functionMetadataTopicName: metadata
clusterCoordinationTopicName: coordinate
pulsarFunctionsNamespace: public/functions
+pulsarAssignmentNamespace: public/assignment
pulsarFunctionsCluster: standalone
pulsarServiceUrl: pulsar://localhost:6650
pulsarWebServiceUrl: http://localhost:8080
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222..54c14c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -105,6 +105,7 @@ public class PulsarSinkE2ETest {
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment";
String primaryHost;
String workerId;
@@ -212,6 +213,7 @@ public class PulsarSinkE2ETest {
private WorkerService createPulsarFunctionWorker(ServiceConfiguration
config) {
workerConfig = new WorkerConfig();
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+ workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 366eaba..060120b 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.functions.worker;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.proto.Request;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
+import java.util.List;
import java.util.function.Function;
@Slf4j
@@ -33,13 +37,22 @@ public class FunctionAssignmentTailer
private final FunctionRuntimeManager functionRuntimeManager;
private final Reader<byte[]> reader;
+
+ private long currentVersion = 0;
+ private final List<Request.AssignmentsUpdate>
currentVersionAssignments;
+ private volatile MessageId previousOldAssignmentMsgId = null;
public FunctionAssignmentTailer(FunctionRuntimeManager
functionRuntimeManager,
Reader<byte[]> reader)
throws PulsarClientException {
- this.functionRuntimeManager = functionRuntimeManager;
- this.reader = reader;
+ this.functionRuntimeManager = functionRuntimeManager;
+ this.reader = reader;
+ this.currentVersionAssignments = Lists.newArrayList();
+ // complete init if reader has no message to read so,
scheduled-manager can schedule assignments
+ if (!hasMessageAvailable()) {
+ this.functionRuntimeManager.initialized = true;
}
+ }
public void start() {
@@ -66,29 +79,40 @@ public class FunctionAssignmentTailer
@Override
public void accept(Message<byte[]> msg) {
- // check if latest
- boolean hasMessageAvailable;
+ Request.AssignmentsUpdate assignmentsUpdate;
try {
- hasMessageAvailable = this.reader.hasMessageAvailable();
- } catch (PulsarClientException e) {
+ assignmentsUpdate =
Request.AssignmentsUpdate.parseFrom(msg.getData());
+ } catch (IOException e) {
+ log.error("[{}] Received bad assignment update at message {}",
reader.getTopic(), msg.getMessageId(),
+ e);
+ // TODO: find a better way to handle bad request
throw new RuntimeException(e);
}
- if (!hasMessageAvailable) {
- Request.AssignmentsUpdate assignmentsUpdate;
- try {
- assignmentsUpdate =
Request.AssignmentsUpdate.parseFrom(msg.getData());
- } catch (IOException e) {
- log.error("[{}] Received bad assignment update at message {}",
reader.getTopic(), msg.getMessageId(),
- e);
- // TODO: find a better way to handle bad request
- throw new RuntimeException(e);
- }
- if (log.isDebugEnabled()) {
- log.debug("Received assignment update: {}", assignmentsUpdate);
+ if (log.isDebugEnabled()) {
+ log.debug("Received assignment update: {}", assignmentsUpdate);
+ }
+
+ // clear previous version assignments and ack all previous messages
+ if (currentVersion < assignmentsUpdate.getVersion()) {
+ currentVersionAssignments.clear();
+ // ack the outdated version to avoid processing again
+ if (previousOldAssignmentMsgId != null &&
this.functionRuntimeManager.isActiveRuntimeConsumer.get()) {
+
this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId);
}
+ }
-
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(),
assignmentsUpdate);
+ currentVersionAssignments.add(assignmentsUpdate);
+
+ // process only if the latest message
+ if (!hasMessageAvailable()) {
+
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(),
currentVersionAssignments);
+ // function-runtime manager has processed all assignments in the
topic at least once.. so scheduled-manager
+ // can only publish any new assignment with latest processed
version
+ this.functionRuntimeManager.initialized = true;
}
+
+ currentVersion = assignmentsUpdate.getVersion();
+ previousOldAssignmentMsgId = msg.getMessageId();
// receive next request
receiveOne();
}
@@ -99,4 +123,12 @@ public class FunctionAssignmentTailer
// TODO: find a better way to handle consumer functions
throw new RuntimeException(cause);
}
+
+ private boolean hasMessageAvailable() {
+ try {
+ return this.reader.hasMessageAvailable();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de..452b51c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,10 +26,16 @@ import java.net.URISyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -38,6 +44,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import javax.ws.rs.WebApplicationException;
@@ -58,6 +65,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -94,6 +102,11 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private final ConnectorsManager connectorsManager;
private final PulsarAdmin functionAdmin;
+
+ private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION =
"pulsar.functions";
+ protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false);
+ protected Consumer<byte[]> assignmentConsumer;
+ protected volatile boolean initialized = false;
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager
connectorsManager) throws Exception {
@@ -103,7 +116,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
Reader<byte[]> reader = workerService.getClient().newReader()
.topic(this.workerConfig.getFunctionAssignmentTopic())
- .startMessageId(MessageId.earliest)
+ .startMessageId(getLatestAssignmentMsgId(workerConfig,
workerService))
.create();
this.functionAssignmentTailer = new FunctionAssignmentTailer(this,
reader);
@@ -132,6 +145,20 @@ public class FunctionRuntimeManager implements
AutoCloseable{
} else {
throw new RuntimeException("Either Thread or Process Container
Factory need to be set");
}
+
+ this.assignmentConsumer = workerService.getClient().newConsumer()
+
.topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION)
+
.subscriptionType(SubscriptionType.Failover).consumerEventListener(new
ConsumerEventListener() {
+ private static final long serialVersionUID = 1L;
+
+ public void becameActive(Consumer<?> consumer, int
partitionId) {
+ isActiveRuntimeConsumer.set(true);
+ }
+
+ public void becameInactive(Consumer<?> consumer, int
partitionId) {
+ isActiveRuntimeConsumer.set(false);
+ }
+ }).subscribe();
this.actionQueue = new LinkedBlockingQueue<>();
@@ -141,6 +168,29 @@ public class FunctionRuntimeManager implements
AutoCloseable{
this.membershipManager = membershipManager;
}
+ private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig,
WorkerService workerService) {
+ try {
+ PulsarAdmin admin = workerService.getBrokerAdmin();
+ PersistentTopicInternalStats topicStats = admin.topics()
+
.getInternalStats(workerConfig.getFunctionAssignmentTopic());
+ if (topicStats != null && topicStats.cursors != null) {
+ CursorStats cursor =
topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION);
+ if (cursor != null &&
StringUtils.isNotBlank(cursor.markDeletePosition)) {
+ String[] ids = cursor.markDeletePosition.split(":");
+ if (ids.length == 2) {
+ MessageIdImpl msgId = new
MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]), -1);
+ log.info("Assignment-reader starts reading from {}",
msgId);
+ return msgId;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to get assignment-msg id for runtime-manager for
{}-{}",
+ workerConfig.getFunctionAssignmentTopic(),
ASSIGNMENT_TOPIC_SUBSCRIPTION, e);
+ }
+ return MessageId.earliest;
+ }
+
/**
* Starts the function runtime manager
*/
@@ -450,6 +500,12 @@ public class FunctionRuntimeManager implements
AutoCloseable{
return functionStatusListBuilder.build();
}
+ public synchronized void processAssignmentUpdate(MessageId messageId,
List<AssignmentsUpdate> assignmentsUpdates) {
+ assignmentsUpdates.forEach(assignmentsUpdate -> {
+ processAssignmentUpdate(messageId, assignmentsUpdate);
+ });
+ }
+
/**
* Process an assignment update from the assignment topic
* @param messageId the message id of the update assignment
@@ -548,9 +604,10 @@ public class FunctionRuntimeManager implements
AutoCloseable{
// set as current assignment
this.currentAssignmentVersion = assignmentsUpdate.getVersion();
-
} else {
- log.debug("Received out of date assignment update: {}",
assignmentsUpdate);
+ if (log.isDebugEnabled()) {
+ log.debug("Received out of date assignment update: {}",
assignmentsUpdate);
+ }
}
}
@@ -689,4 +746,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private FunctionRuntimeInfo getFunctionRuntimeInfo(String
fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}
+
+ public boolean isInitialized() {
+ return initialized;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12..509c925 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index ed00958..40ab132 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -47,9 +47,13 @@ import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
+import com.google.common.collect.Iterables;
+
@Slf4j
public class SchedulerManager implements AutoCloseable {
+ private static final int MAX_ASSIGNMENTS_IN_MSG = 2000;
+
private final WorkerConfig workerConfig;
@Setter
@@ -99,6 +103,14 @@ public class SchedulerManager implements AutoCloseable {
}
private void invokeScheduler() {
+
+ // publish new assignment only once function-runtime manager updates
previously updated version so,
+ // scheduled-manager can publish new assignment with updated version
at FunctionRuntimeManager
+ if (!functionRuntimeManager.isInitialized()) {
+ schedule();
+ return;
+ }
+
List<String> currentMembership =
this.membershipManager.getCurrentMembership()
.stream().map(workerInfo ->
workerInfo.getWorkerId()).collect(Collectors.toList());
@@ -143,21 +155,12 @@ public class SchedulerManager implements AutoCloseable {
List<Assignment> assignments = this.scheduler.schedule(
needsAssignment, currentAssignments, currentMembership);
- log.debug("New assignments computed: {}", assignments);
+ if (log.isDebugEnabled()) {
+ log.debug("New assignments computed: {}", assignments);
+ }
long assignmentVersion =
this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
- Request.AssignmentsUpdate assignmentsUpdate =
Request.AssignmentsUpdate.newBuilder()
- .setVersion(assignmentVersion)
- .addAllAssignments(assignments)
- .build();
-
- CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(assignmentsUpdate.toByteArray());
- try {
- messageIdCompletableFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed to send assignment update", e);
- throw new RuntimeException(e);
- }
+ publishAssignmentUpdate(assignmentVersion, assignments);
// wait for assignment update to go throw the pipeline
int retries = 0;
@@ -176,6 +179,23 @@ public class SchedulerManager implements AutoCloseable {
}
}
+ private void publishAssignmentUpdate(long assignmentVersion,
List<Assignment> assignments) {
+
+ Iterable<List<Assignment>> batches = Iterables.partition(assignments,
MAX_ASSIGNMENTS_IN_MSG);
+ batches.forEach(assignmentBatch -> {
+ Request.AssignmentsUpdate assignmentsUpdate =
Request.AssignmentsUpdate.newBuilder()
+
.setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build();
+ CompletableFuture<MessageId> messageIdCompletableFuture = producer
+ .sendAsync(assignmentsUpdate.toByteArray());
+ try {
+ messageIdCompletableFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to send assignment update", e);
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
public static Map<String, Function.Instance>
computeAllInstances(List<FunctionMetaData> allFunctions) {
Map<String, Function.Instance> functionInstances = new HashMap<>();
for (FunctionMetaData functionMetaData : allFunctions) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index cb73eaa..4d6ba5f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -97,32 +97,10 @@ public class Worker extends AbstractService {
// getting namespace policy
log.info("Initializing Pulsar Functions namespace...");
try {
- try {
-
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
- } catch (PulsarAdminException e) {
- if (e.getStatusCode() ==
Response.Status.NOT_FOUND.getStatusCode()) {
- // if not found than create
- try {
- Policies policies = new Policies();
- policies.retention_policies = new
RetentionPolicies(-1, -1);
- policies.replication_clusters = new HashSet<>();
-
policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
-
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
- policies);
- } catch (PulsarAdminException e1) {
- // prevent race condition with other workers starting
up
- if (e1.getStatusCode() !=
Response.Status.CONFLICT.getStatusCode()) {
- log.error("Failed to create namespace {} for
pulsar functions", workerConfig
- .getPulsarFunctionsNamespace(), e1);
- throw e1;
- }
- }
- } else {
- log.error("Failed to get retention policy for pulsar
function namespace {}",
- workerConfig.getPulsarFunctionsNamespace(), e);
- throw e;
- }
- }
+ createNamespace(admin, workerConfig.getPulsarFunctionsCluster(),
workerConfig.getPulsarFunctionsNamespace(),
+ true);
+ createNamespace(admin, workerConfig.getPulsarFunctionsCluster(),
+ workerConfig.getPulsarAssignmentNamespace(), false);
try {
internalConf = admin.brokers().getInternalConfigurationData();
} catch (PulsarAdminException e) {
@@ -146,6 +124,35 @@ public class Worker extends AbstractService {
}
}
+ private static void createNamespace(PulsarAdmin admin, String cluster,
String namespace, boolean infiniteRetention)
+ throws PulsarAdminException {
+ try {
+ admin.namespaces().getPolicies(namespace);
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() ==
Response.Status.NOT_FOUND.getStatusCode()) {
+ // if not found than create
+ try {
+ Policies policies = new Policies();
+ if (infiniteRetention) {
+ policies.retention_policies = new
RetentionPolicies(-1, -1);
+ }
+ policies.replication_clusters = new HashSet<>();
+ policies.replication_clusters.add(cluster);
+ admin.namespaces().createNamespace(namespace, policies);
+ } catch (PulsarAdminException e1) {
+ // prevent race condition with other workers starting up
+ if (e1.getStatusCode() !=
Response.Status.CONFLICT.getStatusCode()) {
+ log.error("Failed to create namespace {} for pulsar
functions", namespace, e1);
+ throw e1;
+ }
+ }
+ } else {
+ log.error("Failed to get retention policy for pulsar function
namespace {}", namespace, e);
+ throw e;
+ }
+ }
+ }
+
@Override
protected void doStop() {
if (null != this.server) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0f695a9..74ccd7c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -61,6 +61,7 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
private String pulsarWebServiceUrl;
private String clusterCoordinationTopicName;
private String pulsarFunctionsNamespace;
+ private String pulsarAssignmentNamespace;
private String pulsarFunctionsCluster;
private int numFunctionPackageReplicas;
private String downloadDirectory;
@@ -133,7 +134,7 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
}
public String getFunctionAssignmentTopic() {
- return String.format("persistent://%s/%s", pulsarFunctionsNamespace,
functionAssignmentTopicName);
+ return String.format("persistent://%s/%s", pulsarAssignmentNamespace,
functionAssignmentTopicName);
}
public static WorkerConfig load(String yamlFile) throws IOException {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 85a2122..c19d208 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -20,10 +20,16 @@ package org.apache.pulsar.functions.worker;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Request;
@@ -45,6 +51,7 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class FunctionRuntimeManagerTest {
@@ -81,7 +88,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -178,7 +185,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -279,7 +286,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -394,4 +401,24 @@ public class FunctionRuntimeManagerTest {
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"),
assignment3);
}
+
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+ ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
+
+
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+ return mockClient;
+ }
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2753bf1..26e0540 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -76,7 +76,6 @@ public class MembershipManagerTest {
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
-
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
WorkerService workerService = mock(WorkerService.class);
doReturn(workerConfig).when(workerService).getWorkerConfig();
@@ -120,7 +119,7 @@ public class MembershipManagerTest {
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-
+
return mockClient;
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 19977bd..6034124 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -111,6 +111,7 @@ public class SchedulerManagerTest {
schedulerManager = spy(new SchedulerManager(workerConfig,
pulsarClient));
functionRuntimeManager = mock(FunctionRuntimeManager.class);
+ when(functionRuntimeManager.isInitialized()).thenReturn(true);
functionMetaDataManager = mock(FunctionMetaDataManager.class);
membershipManager = mock(MembershipManager.class);
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -145,6 +146,7 @@ public class SchedulerManagerTest {
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
// single node
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -186,6 +188,7 @@ public class SchedulerManagerTest {
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -242,6 +245,7 @@ public class SchedulerManagerTest {
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -313,6 +317,7 @@ public class SchedulerManagerTest {
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -372,6 +377,7 @@ public class SchedulerManagerTest {
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -478,6 +484,7 @@ public class SchedulerManagerTest {
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -646,6 +653,7 @@ public class SchedulerManagerTest {
//set version
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+ doReturn(true).when(functionRuntimeManager).isInitialized();
// single node
List<WorkerInfo> workerInfoList = new LinkedList<>();