srkukarni commented on a change in pull request #7255: URL: https://github.com/apache/pulsar/pull/7255#discussion_r444493849
########## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java ########## @@ -174,89 +181,124 @@ public synchronized boolean containsFunction(String tenant, String namespace, St } /** - * Sends an update request to the FMT (Function Metadata Topic) - * @param functionMetaData The function metadata that needs to be updated - * @return a completable future of when the update has been applied + * Called by the worker when we are in the leader mode. In this state, we update our in-memory + * data structures and then write to the metadata topic. + * @param functionMetaData The function metadata in question + * @param delete Is this a delete operation + * @throws IllegalStateException if we are not the leader + * @throws IllegalArgumentException if the request is out of date. */ - public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) { - - FunctionMetaData existingFunctionMetadata = null; - if (containsFunction(functionMetaData.getFunctionDetails().getTenant(), - functionMetaData.getFunctionDetails().getNamespace(), - functionMetaData.getFunctionDetails().getName())) { - existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(), - functionMetaData.getFunctionDetails().getNamespace(), - functionMetaData.getFunctionDetails().getName()); + public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete) + throws IllegalStateException, IllegalArgumentException { + if (exclusiveLeaderProducer == null) { + throw new IllegalStateException("Not the leader"); + } + boolean needsScheduling; + if (delete) { + needsScheduling = proccessDeregister(functionMetaData); + } else { + needsScheduling = processUpdate(functionMetaData); + } + Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder() + .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE) + .setFunctionMetaData(functionMetaData) + .setWorkerId(workerConfig.getWorkerId()) + .setRequestId(UUID.randomUUID().toString()) + .build(); + try { + lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray()); + } catch (Exception e) { + log.error("Could not write into Function Metadata topic", e); + errorNotifier.triggerError(e); + } + if (needsScheduling) { + this.schedulerManager.schedule(); } - - FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData); - - Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest( - this.workerConfig.getWorkerId(), newFunctionMetaData); - - return submit(updateRequest); } - /** - * Sends a deregister request to the FMT (Function Metadata Topic) for a function - * @param tenant the tenant the function that needs to be deregistered belongs to - * @param namespace the namespace the function that needs to be deregistered belongs to - * @param functionName the name of the function - * @return a completable future of when the deregister has been applied + * Called by the leader service when this worker becomes the leader. + * We first get exclusive producer on the metadata topic. Next we drain the tailer + * to ensure that we have caught up to metadata topic. After which we close the tailer. + * Note that this method cannot be syncrhonized because the tailer might still be processing messages */ - public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) { - FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName); - - FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData); - - Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest( - this.workerConfig.getWorkerId(), newFunctionMetaData); + public void acquireLeadership() { + log.info("FunctionMetaDataManager becoming leader by creating exclusive producer"); + FunctionMetaDataTopicTailer tailer = internalAcquireLeadership(); + // Now that we have created the exclusive producer, wait for reader to get over + if (tailer != null) { + try { + tailer.stopWhenNoMoreMessages().get(); + } catch (Exception e) { + log.error("Error while waiting for metadata tailer thread to finish", e); + errorNotifier.triggerError(e); + } + tailer.close(); + } + this.schedulerManager.schedule(); + log.info("FunctionMetaDataManager done becoming leader by doing its first schedule"); + } - return submit(deregisterRequest); + private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() { + if (exclusiveLeaderProducer == null) { + try { + exclusiveLeaderProducer = pulsarClient.newProducer() + .topic(this.workerConfig.getFunctionMetadataTopic()) + .producerName(workerConfig.getWorkerId() + "-leader") + // .type(EXCLUSIVE) + .create(); + } catch (PulsarClientException e) { + log.error("Error creating exclusive producer", e); + errorNotifier.triggerError(e); + } + } else { + log.error("Logic Error in FunctionMetaData Manager"); + errorNotifier.triggerError(new IllegalStateException()); + } + FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer; + this.functionMetaDataTopicTailer = null; + return tailer; } /** - * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function - * @param tenant the tenant the function that needs to be deregistered belongs to - * @param namespace the namespace the function that needs to be deregistered belongs to - * @param functionName the name of the function - * @param instanceId the instanceId of the function, -1 if for all instances - * @param start do we need to start or stop - * @return a completable future of when the start/stop has been applied + * called by the leader service when we lose leadership. We close the exclusive producer + * and start the tailer. */ - public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName, - Integer instanceId, boolean start) { - FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName); - - FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start); - - Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest( - this.workerConfig.getWorkerId(), newFunctionMetaData); - - return submit(updateRequest); + public synchronized void giveupLeadership() { + log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer"); + try { + exclusiveLeaderProducer.close(); + exclusiveLeaderProducer = null; + initializeTailer(); Review comment: That happens at start ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org