srkukarni commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444493849



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String 
tenant, String namespace, St
     }
 
     /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
+     * Called by the worker when we are in the leader mode.  In this state, we 
update our in-memory
+     * data structures and then write to the metadata topic.
+     * @param functionMetaData The function metadata in question
+     * @param delete Is this a delete operation
+     * @throws IllegalStateException if we are not the leader
+     * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized CompletableFuture<RequestResult> 
updateFunction(FunctionMetaData functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = 
getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData 
functionMetaData, boolean delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = 
Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? 
Request.ServiceRequest.ServiceRequestType.DELETE : 
Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            lastMessageSeen = 
exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = 
FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, 
functionMetaData);
-
-        Request.ServiceRequest updateRequest = 
ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
     /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a 
function
-     * @param tenant the tenant the function that needs to be deregistered 
belongs to
-     * @param namespace the namespace the function that needs to be 
deregistered belongs to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
+     * Called by the leader service when this worker becomes the leader.
+     * We first get exclusive producer on the metadata topic. Next we drain 
the tailer
+     * to ensure that we have caught up to metadata topic. After which we 
close the tailer.
+     * Note that this method cannot be syncrhonized because the tailer might 
still be processing messages
      */
-    public synchronized CompletableFuture<RequestResult> 
deregisterFunction(String tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = 
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = 
FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, 
functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = 
ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating 
exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to 
get over
+        if (tailer != null) {
+            try {
+                tailer.stopWhenNoMoreMessages().get();
+            } catch (Exception e) {
+                log.error("Error while waiting for metadata tailer thread to 
finish", e);
+                errorNotifier.triggerError(e);
+            }
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its 
first schedule");
+    }
 
-        return submit(deregisterRequest);
+    private synchronized FunctionMetaDataTopicTailer 
internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
     }
 
     /**
-     * Sends a start/stop function request to the FMT (Function Metadata 
Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered 
belongs to
-     * @param namespace the namespace the function that needs to be 
deregistered belongs to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all 
instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
+     * called by the leader service when we lose leadership. We close the 
exclusive producer
+     * and start the tailer.
      */
-    public synchronized CompletableFuture<RequestResult> 
changeFunctionInstanceStatus(String tenant, String namespace, String 
functionName,
-                                                                               
       Integer instanceId, boolean start) {
-        FunctionMetaData functionMetaData = 
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = 
FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, 
instanceId, start);
-
-        Request.ServiceRequest updateRequest = 
ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing 
exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+            exclusiveLeaderProducer = null;
+            initializeTailer();

Review comment:
       That happens at start




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to