This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b283ebd  Fix: Function assignment can support large number of topics 
(#2438)
b283ebd is described below

commit b283ebd3e51aa86f6b9f0a6607f430996f0032f7
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Aug 27 23:56:51 2018 -0700

    Fix: Function assignment can support large number of topics (#2438)
---
 conf/functions_worker.yml                          |  1 +
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  2 +
 .../functions/worker/FunctionAssignmentTailer.java | 70 ++++++++++++++++------
 .../functions/worker/FunctionRuntimeManager.java   | 67 ++++++++++++++++++++-
 .../pulsar/functions/worker/MembershipManager.java |  2 +
 .../pulsar/functions/worker/SchedulerManager.java  | 46 ++++++++++----
 .../org/apache/pulsar/functions/worker/Worker.java | 59 ++++++++++--------
 .../pulsar/functions/worker/WorkerConfig.java      |  3 +-
 .../worker/FunctionRuntimeManagerTest.java         | 33 +++++++++-
 .../functions/worker/MembershipManagerTest.java    |  3 +-
 .../functions/worker/SchedulerManagerTest.java     |  8 +++
 11 files changed, 227 insertions(+), 67 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb..497c414 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,7 @@ connectorsDirectory: ./connectors
 functionMetadataTopicName: metadata
 clusterCoordinationTopicName: coordinate
 pulsarFunctionsNamespace: public/functions
+pulsarAssignmentNamespace: public/assignment
 pulsarFunctionsCluster: standalone
 pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222..54c14c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -105,6 +105,7 @@ public class PulsarSinkE2ETest {
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment";
     String primaryHost;
     String workerId;
 
@@ -212,6 +213,7 @@ public class PulsarSinkE2ETest {
     private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
         workerConfig = new WorkerConfig();
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace);
         workerConfig.setSchedulerClassName(
                 
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
         workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 366eaba..060120b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.functions.worker;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Request;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
+import java.util.List;
 import java.util.function.Function;
 
 @Slf4j
@@ -33,13 +37,22 @@ public class FunctionAssignmentTailer
 
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
+        
+        private long currentVersion = 0;
+        private final List<Request.AssignmentsUpdate> 
currentVersionAssignments;
+        private volatile MessageId previousOldAssignmentMsgId = null;
 
     public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
                 Reader<byte[]> reader)
             throws PulsarClientException {
-            this.functionRuntimeManager = functionRuntimeManager;
-            this.reader = reader;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.reader = reader;
+        this.currentVersionAssignments = Lists.newArrayList();
+        // complete init if reader has no message to read so, 
scheduled-manager can schedule assignments
+        if (!hasMessageAvailable()) {
+            this.functionRuntimeManager.initialized = true;
         }
+    }
 
     public void start() {
 
@@ -66,29 +79,40 @@ public class FunctionAssignmentTailer
     @Override
     public void accept(Message<byte[]> msg) {
 
-        // check if latest
-        boolean hasMessageAvailable;
+        Request.AssignmentsUpdate assignmentsUpdate;
         try {
-            hasMessageAvailable = this.reader.hasMessageAvailable();
-        } catch (PulsarClientException e) {
+            assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
+        } catch (IOException e) {
+            log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
+                    e);
+            // TODO: find a better way to handle bad request
             throw new RuntimeException(e);
         }
-        if (!hasMessageAvailable) {
-            Request.AssignmentsUpdate assignmentsUpdate;
-            try {
-                assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
-                        e);
-                // TODO: find a better way to handle bad request
-                throw new RuntimeException(e);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Received assignment update: {}", assignmentsUpdate);
+        if (log.isDebugEnabled()) {
+            log.debug("Received assignment update: {}", assignmentsUpdate);
+        }
+
+        // clear previous version assignments and ack all previous messages
+        if (currentVersion < assignmentsUpdate.getVersion()) {
+            currentVersionAssignments.clear();
+            // ack the outdated version to avoid processing again
+            if (previousOldAssignmentMsgId != null && 
this.functionRuntimeManager.isActiveRuntimeConsumer.get()) {
+                
this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId);
             }
+        }
 
-            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
assignmentsUpdate);
+        currentVersionAssignments.add(assignmentsUpdate);
+        
+        // process only if the latest message
+        if (!hasMessageAvailable()) {
+            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
currentVersionAssignments);
+            // function-runtime manager has processed all assignments in the 
topic at least once.. so scheduled-manager
+            // can only publish any new assignment with latest processed 
version
+            this.functionRuntimeManager.initialized = true;
         }
+
+        currentVersion = assignmentsUpdate.getVersion();
+        previousOldAssignmentMsgId = msg.getMessageId();
         // receive next request
         receiveOne();
     }
@@ -99,4 +123,12 @@ public class FunctionAssignmentTailer
         // TODO: find a better way to handle consumer functions
         throw new RuntimeException(cause);
     }
+    
+    private boolean hasMessageAvailable() {
+        try {
+            return this.reader.hasMessageAvailable();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de..452b51c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,10 +26,16 @@ import java.net.URISyntaxException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -38,6 +44,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
 import javax.ws.rs.WebApplicationException;
@@ -58,6 +65,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -94,6 +102,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private final ConnectorsManager connectorsManager;
     
     private final PulsarAdmin functionAdmin;
+    
+    private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION = 
"pulsar.functions";
+    protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false);
+    protected Consumer<byte[]> assignmentConsumer;
+    protected volatile boolean initialized = false;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
@@ -103,7 +116,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         Reader<byte[]> reader = workerService.getClient().newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
-                .startMessageId(MessageId.earliest)
+                .startMessageId(getLatestAssignmentMsgId(workerConfig, 
workerService))
                 .create();
 
         this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
@@ -132,6 +145,20 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         } else {
             throw new RuntimeException("Either Thread or Process Container 
Factory need to be set");
         }
+        
+        this.assignmentConsumer = workerService.getClient().newConsumer()
+                
.topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION)
+                
.subscriptionType(SubscriptionType.Failover).consumerEventListener(new 
ConsumerEventListener() {
+                    private static final long serialVersionUID = 1L;
+
+                    public void becameActive(Consumer<?> consumer, int 
partitionId) {
+                        isActiveRuntimeConsumer.set(true);
+                    }
+
+                    public void becameInactive(Consumer<?> consumer, int 
partitionId) {
+                        isActiveRuntimeConsumer.set(false);
+                    }
+                }).subscribe();
 
         this.actionQueue = new LinkedBlockingQueue<>();
 
@@ -141,6 +168,29 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         this.membershipManager = membershipManager;
     }
 
+    private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig, 
WorkerService workerService) {
+        try {
+            PulsarAdmin admin = workerService.getBrokerAdmin();
+            PersistentTopicInternalStats topicStats = admin.topics()
+                    
.getInternalStats(workerConfig.getFunctionAssignmentTopic());
+            if (topicStats != null && topicStats.cursors != null) {
+                CursorStats cursor = 
topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION);
+                if (cursor != null && 
StringUtils.isNotBlank(cursor.markDeletePosition)) {
+                    String[] ids = cursor.markDeletePosition.split(":");
+                    if (ids.length == 2) {
+                        MessageIdImpl msgId = new 
MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]), -1);
+                        log.info("Assignment-reader starts reading from {}", 
msgId);
+                        return msgId;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to get assignment-msg id for runtime-manager for 
{}-{}",
+                    workerConfig.getFunctionAssignmentTopic(), 
ASSIGNMENT_TOPIC_SUBSCRIPTION, e);
+        }
+        return MessageId.earliest;
+    }
+
     /**
      * Starts the function runtime manager
      */
@@ -450,6 +500,12 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return functionStatusListBuilder.build();
     }
 
+    public synchronized void processAssignmentUpdate(MessageId messageId, 
List<AssignmentsUpdate> assignmentsUpdates) {
+        assignmentsUpdates.forEach(assignmentsUpdate -> {
+            processAssignmentUpdate(messageId, assignmentsUpdate);
+        });
+    }
+    
     /**
      * Process an assignment update from the assignment topic
      * @param messageId the message id of the update assignment
@@ -548,9 +604,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
             // set as current assignment
             this.currentAssignmentVersion = assignmentsUpdate.getVersion();
-
         } else {
-            log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
+            if (log.isDebugEnabled()) {
+                log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
+            }
         }
     }
 
@@ -689,4 +746,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
+    
+    public boolean isInitialized() {
+        return initialized;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12..509c925 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index ed00958..40ab132 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -47,9 +47,13 @@ import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
+import com.google.common.collect.Iterables;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
+    private static final int MAX_ASSIGNMENTS_IN_MSG = 2000;
+
     private final WorkerConfig workerConfig;
 
     @Setter
@@ -99,6 +103,14 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     private void invokeScheduler() {
+        
+        // publish new assignment only once function-runtime manager updates 
previously updated version so,
+        // scheduled-manager can publish new assignment with updated version 
at FunctionRuntimeManager
+        if (!functionRuntimeManager.isInitialized()) {
+            schedule();
+            return;
+        }
+        
         List<String> currentMembership = 
this.membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toList());
 
@@ -143,21 +155,12 @@ public class SchedulerManager implements AutoCloseable {
         List<Assignment> assignments = this.scheduler.schedule(
                 needsAssignment, currentAssignments, currentMembership);
 
-        log.debug("New assignments computed: {}", assignments);
+        if (log.isDebugEnabled()) {
+            log.debug("New assignments computed: {}", assignments);
+        }
 
         long assignmentVersion = 
this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .setVersion(assignmentVersion)
-                .addAllAssignments(assignments)
-                .build();
-
-        CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(assignmentsUpdate.toByteArray());
-        try {
-            messageIdCompletableFuture.get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to send assignment update", e);
-            throw new RuntimeException(e);
-        }
+        publishAssignmentUpdate(assignmentVersion, assignments);
 
         // wait for assignment update to go throw the pipeline
         int retries = 0;
@@ -176,6 +179,23 @@ public class SchedulerManager implements AutoCloseable {
         }
     }
 
+    private void publishAssignmentUpdate(long assignmentVersion, 
List<Assignment> assignments) {
+
+        Iterable<List<Assignment>> batches = Iterables.partition(assignments, 
MAX_ASSIGNMENTS_IN_MSG);
+        batches.forEach(assignmentBatch -> {
+            Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
+                    
.setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build();
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer
+                    .sendAsync(assignmentsUpdate.toByteArray());
+            try {
+                messageIdCompletableFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Failed to send assignment update", e);
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
     public static Map<String, Function.Instance> 
computeAllInstances(List<FunctionMetaData> allFunctions) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index cb73eaa..4d6ba5f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -97,32 +97,10 @@ public class Worker extends AbstractService {
         // getting namespace policy
         log.info("Initializing Pulsar Functions namespace...");
         try {
-            try {
-                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
-            } catch (PulsarAdminException e) {
-                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-                    // if not found than create
-                    try {
-                        Policies policies = new Policies();
-                        policies.retention_policies = new 
RetentionPolicies(-1, -1);
-                        policies.replication_clusters = new HashSet<>();
-                        
policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
-                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
-                                policies);
-                    } catch (PulsarAdminException e1) {
-                        // prevent race condition with other workers starting 
up
-                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
-                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
-                                    .getPulsarFunctionsNamespace(), e1);
-                            throw e1;
-                        }
-                    }
-                } else {
-                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
-                            workerConfig.getPulsarFunctionsNamespace(), e);
-                    throw e;
-                }
-            }
+            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), 
workerConfig.getPulsarFunctionsNamespace(),
+                    true);
+            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(),
+                    workerConfig.getPulsarAssignmentNamespace(), false);
             try {
                 internalConf = admin.brokers().getInternalConfigurationData();
             } catch (PulsarAdminException e) {
@@ -146,6 +124,35 @@ public class Worker extends AbstractService {
         }
     }
 
+    private static void createNamespace(PulsarAdmin admin, String cluster, 
String namespace, boolean infiniteRetention)
+            throws PulsarAdminException {
+        try {
+            admin.namespaces().getPolicies(namespace);
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
+                // if not found than create
+                try {
+                    Policies policies = new Policies();
+                    if (infiniteRetention) {
+                        policies.retention_policies = new 
RetentionPolicies(-1, -1);
+                    }
+                    policies.replication_clusters = new HashSet<>();
+                    policies.replication_clusters.add(cluster);
+                    admin.namespaces().createNamespace(namespace, policies);
+                } catch (PulsarAdminException e1) {
+                    // prevent race condition with other workers starting up
+                    if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
+                        log.error("Failed to create namespace {} for pulsar 
functions", namespace, e1);
+                        throw e1;
+                    }
+                }
+            } else {
+                log.error("Failed to get retention policy for pulsar function 
namespace {}", namespace, e);
+                throw e;
+            }
+        }
+    }
+
     @Override
     protected void doStop() {
         if (null != this.server) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0f695a9..74ccd7c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -61,6 +61,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private String pulsarWebServiceUrl;
     private String clusterCoordinationTopicName;
     private String pulsarFunctionsNamespace;
+    private String pulsarAssignmentNamespace;
     private String pulsarFunctionsCluster;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
@@ -133,7 +134,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     }
 
     public String getFunctionAssignmentTopic() {
-        return String.format("persistent://%s/%s", pulsarFunctionsNamespace, 
functionAssignmentTopicName);
+        return String.format("persistent://%s/%s", pulsarAssignmentNamespace, 
functionAssignmentTopicName);
     }
 
     public static WorkerConfig load(String yamlFile) throws IOException {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 85a2122..c19d208 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -20,10 +20,16 @@ package org.apache.pulsar.functions.worker;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Request;
@@ -45,6 +51,7 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FunctionRuntimeManagerTest {
 
@@ -81,7 +88,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -178,7 +185,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -279,7 +286,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -394,4 +401,24 @@ public class FunctionRuntimeManagerTest {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
+    
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
+
+        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+        
+        return mockClient;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2753bf1..26e0540 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -76,7 +76,6 @@ public class MembershipManagerTest {
         
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
         
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
         when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
-
         when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
         WorkerService workerService = mock(WorkerService.class);
         doReturn(workerConfig).when(workerService).getWorkerConfig();
@@ -120,7 +119,7 @@ public class MembershipManagerTest {
         
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-
+        
         return mockClient;
     }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 19977bd..6034124 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -111,6 +111,7 @@ public class SchedulerManagerTest {
 
         schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
+        when(functionRuntimeManager.isInitialized()).thenReturn(true);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
         schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -145,6 +146,7 @@ public class SchedulerManagerTest {
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -186,6 +188,7 @@ public class SchedulerManagerTest {
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -242,6 +245,7 @@ public class SchedulerManagerTest {
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -313,6 +317,7 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -372,6 +377,7 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -478,6 +484,7 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -646,6 +653,7 @@ public class SchedulerManagerTest {
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();

Reply via email to