This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce74b11  Revert "Fix: Function assignment can support large number of 
topics (#2438)" (#2474)
ce74b11 is described below

commit ce74b11b563462d18ae0068cf2d9d9ef0271165a
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Aug 30 22:44:21 2018 -0700

    Revert "Fix: Function assignment can support large number of topics 
(#2438)" (#2474)
    
    This reverts commit b283ebd3e51aa86f6b9f0a6607f430996f0032f7.
---
 conf/functions_worker.yml                          |  1 -
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  2 -
 .../functions/worker/FunctionAssignmentTailer.java | 70 ++++++----------------
 .../functions/worker/FunctionRuntimeManager.java   | 67 +--------------------
 .../pulsar/functions/worker/MembershipManager.java |  2 -
 .../pulsar/functions/worker/SchedulerManager.java  | 46 ++++----------
 .../org/apache/pulsar/functions/worker/Worker.java | 59 ++++++++----------
 .../pulsar/functions/worker/WorkerConfig.java      |  3 +-
 .../worker/FunctionRuntimeManagerTest.java         | 33 +---------
 .../functions/worker/MembershipManagerTest.java    |  3 +-
 .../functions/worker/SchedulerManagerTest.java     |  8 ---
 11 files changed, 67 insertions(+), 227 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 497c414..444b7fb 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,7 +27,6 @@ connectorsDirectory: ./connectors
 functionMetadataTopicName: metadata
 clusterCoordinationTopicName: coordinate
 pulsarFunctionsNamespace: public/functions
-pulsarAssignmentNamespace: public/assignment
 pulsarFunctionsCluster: standalone
 pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 54c14c0..dd39222 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -105,7 +105,6 @@ public class PulsarSinkE2ETest {
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
-    String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment";
     String primaryHost;
     String workerId;
 
@@ -213,7 +212,6 @@ public class PulsarSinkE2ETest {
     private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
         workerConfig = new WorkerConfig();
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
-        workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace);
         workerConfig.setSchedulerClassName(
                 
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
         workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 060120b..366eaba 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -20,15 +20,11 @@ package org.apache.pulsar.functions.worker;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Request;
 
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
-import java.util.List;
 import java.util.function.Function;
 
 @Slf4j
@@ -37,22 +33,13 @@ public class FunctionAssignmentTailer
 
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
-        
-        private long currentVersion = 0;
-        private final List<Request.AssignmentsUpdate> 
currentVersionAssignments;
-        private volatile MessageId previousOldAssignmentMsgId = null;
 
     public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
                 Reader<byte[]> reader)
             throws PulsarClientException {
-        this.functionRuntimeManager = functionRuntimeManager;
-        this.reader = reader;
-        this.currentVersionAssignments = Lists.newArrayList();
-        // complete init if reader has no message to read so, 
scheduled-manager can schedule assignments
-        if (!hasMessageAvailable()) {
-            this.functionRuntimeManager.initialized = true;
+            this.functionRuntimeManager = functionRuntimeManager;
+            this.reader = reader;
         }
-    }
 
     public void start() {
 
@@ -79,40 +66,29 @@ public class FunctionAssignmentTailer
     @Override
     public void accept(Message<byte[]> msg) {
 
-        Request.AssignmentsUpdate assignmentsUpdate;
+        // check if latest
+        boolean hasMessageAvailable;
         try {
-            assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
-        } catch (IOException e) {
-            log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
-                    e);
-            // TODO: find a better way to handle bad request
+            hasMessageAvailable = this.reader.hasMessageAvailable();
+        } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
-        if (log.isDebugEnabled()) {
-            log.debug("Received assignment update: {}", assignmentsUpdate);
-        }
-
-        // clear previous version assignments and ack all previous messages
-        if (currentVersion < assignmentsUpdate.getVersion()) {
-            currentVersionAssignments.clear();
-            // ack the outdated version to avoid processing again
-            if (previousOldAssignmentMsgId != null && 
this.functionRuntimeManager.isActiveRuntimeConsumer.get()) {
-                
this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId);
+        if (!hasMessageAvailable) {
+            Request.AssignmentsUpdate assignmentsUpdate;
+            try {
+                assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
+            } catch (IOException e) {
+                log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
+                        e);
+                // TODO: find a better way to handle bad request
+                throw new RuntimeException(e);
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Received assignment update: {}", assignmentsUpdate);
             }
-        }
 
-        currentVersionAssignments.add(assignmentsUpdate);
-        
-        // process only if the latest message
-        if (!hasMessageAvailable()) {
-            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
currentVersionAssignments);
-            // function-runtime manager has processed all assignments in the 
topic at least once.. so scheduled-manager
-            // can only publish any new assignment with latest processed 
version
-            this.functionRuntimeManager.initialized = true;
+            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
assignmentsUpdate);
         }
-
-        currentVersion = assignmentsUpdate.getVersion();
-        previousOldAssignmentMsgId = msg.getMessageId();
         // receive next request
         receiveOne();
     }
@@ -123,12 +99,4 @@ public class FunctionAssignmentTailer
         // TODO: find a better way to handle consumer functions
         throw new RuntimeException(cause);
     }
-    
-    private boolean hasMessageAvailable() {
-        try {
-            return this.reader.hasMessageAvailable();
-        } catch (PulsarClientException e) {
-            throw new RuntimeException(e);
-        }
-    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 452b51c..93828de 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,16 +26,10 @@ import java.net.URISyntaxException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -44,7 +38,6 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
 import javax.ws.rs.WebApplicationException;
@@ -65,7 +58,6 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -102,11 +94,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private final ConnectorsManager connectorsManager;
     
     private final PulsarAdmin functionAdmin;
-    
-    private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION = 
"pulsar.functions";
-    protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false);
-    protected Consumer<byte[]> assignmentConsumer;
-    protected volatile boolean initialized = false;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
@@ -116,7 +103,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         Reader<byte[]> reader = workerService.getClient().newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
-                .startMessageId(getLatestAssignmentMsgId(workerConfig, 
workerService))
+                .startMessageId(MessageId.earliest)
                 .create();
 
         this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
@@ -145,20 +132,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         } else {
             throw new RuntimeException("Either Thread or Process Container 
Factory need to be set");
         }
-        
-        this.assignmentConsumer = workerService.getClient().newConsumer()
-                
.topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION)
-                
.subscriptionType(SubscriptionType.Failover).consumerEventListener(new 
ConsumerEventListener() {
-                    private static final long serialVersionUID = 1L;
-
-                    public void becameActive(Consumer<?> consumer, int 
partitionId) {
-                        isActiveRuntimeConsumer.set(true);
-                    }
-
-                    public void becameInactive(Consumer<?> consumer, int 
partitionId) {
-                        isActiveRuntimeConsumer.set(false);
-                    }
-                }).subscribe();
 
         this.actionQueue = new LinkedBlockingQueue<>();
 
@@ -168,29 +141,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         this.membershipManager = membershipManager;
     }
 
-    private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig, 
WorkerService workerService) {
-        try {
-            PulsarAdmin admin = workerService.getBrokerAdmin();
-            PersistentTopicInternalStats topicStats = admin.topics()
-                    
.getInternalStats(workerConfig.getFunctionAssignmentTopic());
-            if (topicStats != null && topicStats.cursors != null) {
-                CursorStats cursor = 
topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION);
-                if (cursor != null && 
StringUtils.isNotBlank(cursor.markDeletePosition)) {
-                    String[] ids = cursor.markDeletePosition.split(":");
-                    if (ids.length == 2) {
-                        MessageIdImpl msgId = new 
MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]), -1);
-                        log.info("Assignment-reader starts reading from {}", 
msgId);
-                        return msgId;
-                    }
-                }
-            }
-        } catch (Exception e) {
-            log.warn("Failed to get assignment-msg id for runtime-manager for 
{}-{}",
-                    workerConfig.getFunctionAssignmentTopic(), 
ASSIGNMENT_TOPIC_SUBSCRIPTION, e);
-        }
-        return MessageId.earliest;
-    }
-
     /**
      * Starts the function runtime manager
      */
@@ -500,12 +450,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return functionStatusListBuilder.build();
     }
 
-    public synchronized void processAssignmentUpdate(MessageId messageId, 
List<AssignmentsUpdate> assignmentsUpdates) {
-        assignmentsUpdates.forEach(assignmentsUpdate -> {
-            processAssignmentUpdate(messageId, assignmentsUpdate);
-        });
-    }
-    
     /**
      * Process an assignment update from the assignment topic
      * @param messageId the message id of the update assignment
@@ -604,10 +548,9 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
             // set as current assignment
             this.currentAssignmentVersion = assignmentsUpdate.getVersion();
+
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
-            }
+            log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
         }
     }
 
@@ -746,8 +689,4 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
-    
-    public boolean isInitialized() {
-        return initialized;
-    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 509c925..b18fd12 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,7 +43,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 40ab132..ed00958 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -47,13 +47,9 @@ import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
-import com.google.common.collect.Iterables;
-
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
-    private static final int MAX_ASSIGNMENTS_IN_MSG = 2000;
-
     private final WorkerConfig workerConfig;
 
     @Setter
@@ -103,14 +99,6 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     private void invokeScheduler() {
-        
-        // publish new assignment only once function-runtime manager updates 
previously updated version so,
-        // scheduled-manager can publish new assignment with updated version 
at FunctionRuntimeManager
-        if (!functionRuntimeManager.isInitialized()) {
-            schedule();
-            return;
-        }
-        
         List<String> currentMembership = 
this.membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toList());
 
@@ -155,12 +143,21 @@ public class SchedulerManager implements AutoCloseable {
         List<Assignment> assignments = this.scheduler.schedule(
                 needsAssignment, currentAssignments, currentMembership);
 
-        if (log.isDebugEnabled()) {
-            log.debug("New assignments computed: {}", assignments);
-        }
+        log.debug("New assignments computed: {}", assignments);
 
         long assignmentVersion = 
this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
-        publishAssignmentUpdate(assignmentVersion, assignments);
+        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
+                .setVersion(assignmentVersion)
+                .addAllAssignments(assignments)
+                .build();
+
+        CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(assignmentsUpdate.toByteArray());
+        try {
+            messageIdCompletableFuture.get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("Failed to send assignment update", e);
+            throw new RuntimeException(e);
+        }
 
         // wait for assignment update to go throw the pipeline
         int retries = 0;
@@ -179,23 +176,6 @@ public class SchedulerManager implements AutoCloseable {
         }
     }
 
-    private void publishAssignmentUpdate(long assignmentVersion, 
List<Assignment> assignments) {
-
-        Iterable<List<Assignment>> batches = Iterables.partition(assignments, 
MAX_ASSIGNMENTS_IN_MSG);
-        batches.forEach(assignmentBatch -> {
-            Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                    
.setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build();
-            CompletableFuture<MessageId> messageIdCompletableFuture = producer
-                    .sendAsync(assignmentsUpdate.toByteArray());
-            try {
-                messageIdCompletableFuture.get();
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to send assignment update", e);
-                throw new RuntimeException(e);
-            }
-        });
-    }
-
     public static Map<String, Function.Instance> 
computeAllInstances(List<FunctionMetaData> allFunctions) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 4d6ba5f..cb73eaa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -97,10 +97,32 @@ public class Worker extends AbstractService {
         // getting namespace policy
         log.info("Initializing Pulsar Functions namespace...");
         try {
-            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), 
workerConfig.getPulsarFunctionsNamespace(),
-                    true);
-            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(),
-                    workerConfig.getPulsarAssignmentNamespace(), false);
+            try {
+                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
+            } catch (PulsarAdminException e) {
+                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
+                    // if not found than create
+                    try {
+                        Policies policies = new Policies();
+                        policies.retention_policies = new 
RetentionPolicies(-1, -1);
+                        policies.replication_clusters = new HashSet<>();
+                        
policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
+                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
+                                policies);
+                    } catch (PulsarAdminException e1) {
+                        // prevent race condition with other workers starting 
up
+                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
+                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
+                                    .getPulsarFunctionsNamespace(), e1);
+                            throw e1;
+                        }
+                    }
+                } else {
+                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
+                            workerConfig.getPulsarFunctionsNamespace(), e);
+                    throw e;
+                }
+            }
             try {
                 internalConf = admin.brokers().getInternalConfigurationData();
             } catch (PulsarAdminException e) {
@@ -124,35 +146,6 @@ public class Worker extends AbstractService {
         }
     }
 
-    private static void createNamespace(PulsarAdmin admin, String cluster, 
String namespace, boolean infiniteRetention)
-            throws PulsarAdminException {
-        try {
-            admin.namespaces().getPolicies(namespace);
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-                // if not found than create
-                try {
-                    Policies policies = new Policies();
-                    if (infiniteRetention) {
-                        policies.retention_policies = new 
RetentionPolicies(-1, -1);
-                    }
-                    policies.replication_clusters = new HashSet<>();
-                    policies.replication_clusters.add(cluster);
-                    admin.namespaces().createNamespace(namespace, policies);
-                } catch (PulsarAdminException e1) {
-                    // prevent race condition with other workers starting up
-                    if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
-                        log.error("Failed to create namespace {} for pulsar 
functions", namespace, e1);
-                        throw e1;
-                    }
-                }
-            } else {
-                log.error("Failed to get retention policy for pulsar function 
namespace {}", namespace, e);
-                throw e;
-            }
-        }
-    }
-
     @Override
     protected void doStop() {
         if (null != this.server) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 74ccd7c..0f695a9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -61,7 +61,6 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private String pulsarWebServiceUrl;
     private String clusterCoordinationTopicName;
     private String pulsarFunctionsNamespace;
-    private String pulsarAssignmentNamespace;
     private String pulsarFunctionsCluster;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
@@ -134,7 +133,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     }
 
     public String getFunctionAssignmentTopic() {
-        return String.format("persistent://%s/%s", pulsarAssignmentNamespace, 
functionAssignmentTopicName);
+        return String.format("persistent://%s/%s", pulsarFunctionsNamespace, 
functionAssignmentTopicName);
     }
 
     public static WorkerConfig load(String yamlFile) throws IOException {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index c19d208..85a2122 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -20,16 +20,10 @@ package org.apache.pulsar.functions.worker;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Request;
@@ -51,7 +45,6 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class FunctionRuntimeManagerTest {
 
@@ -88,7 +81,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
 
-        PulsarClient pulsarClient = mockPulsarClient();
+        PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -185,7 +178,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mockPulsarClient();
+        PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -286,7 +279,7 @@ public class FunctionRuntimeManagerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mockPulsarClient();
+        PulsarClient pulsarClient = mock(PulsarClient.class);
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -401,24 +394,4 @@ public class FunctionRuntimeManagerTest {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
-    
-    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-
-        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
-        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
-
-        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
-        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
-        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
-        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
-
-        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
-
-        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
-
-        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-        
-        return mockClient;
-    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 26e0540..2753bf1 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -76,6 +76,7 @@ public class MembershipManagerTest {
         
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
         
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
         when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
         when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
         WorkerService workerService = mock(WorkerService.class);
         doReturn(workerConfig).when(workerService).getWorkerConfig();
@@ -119,7 +120,7 @@ public class MembershipManagerTest {
         
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-        
+
         return mockClient;
     }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 6034124..19977bd 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -111,7 +111,6 @@ public class SchedulerManagerTest {
 
         schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
-        when(functionRuntimeManager.isInitialized()).thenReturn(true);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
         schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -146,7 +145,6 @@ public class SchedulerManagerTest {
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -188,7 +186,6 @@ public class SchedulerManagerTest {
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -245,7 +242,6 @@ public class SchedulerManagerTest {
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -317,7 +313,6 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -377,7 +372,6 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -484,7 +478,6 @@ public class SchedulerManagerTest {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -653,7 +646,6 @@ public class SchedulerManagerTest {
 
         //set version
         
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();

Reply via email to