jerrypeng commented on a change in pull request #7255: URL: https://github.com/apache/pulsar/pull/7255#discussion_r444440523
########## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java ########## @@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig, this.pulsarClient, this.workerConfig.getFunctionMetadataTopic()); this.schedulerManager = schedulerManager; this.errorNotifier = errorNotifier; + exclusiveLeaderProducer = null; } /** * Public methods. Please use these methods if references FunctionMetaManager from an external class */ /** - * Initializes the FunctionMetaDataManager. Does the following: - * 1. Consume all existing function meta data upon start to establish existing state + * Initializes the FunctionMetaDataManager. By default we start in the worker mode. + * We consume all existing function meta data to establish existing state */ public void initialize() { try { + initializeTailer(); this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, pulsarClient.newReader(), this.workerConfig, this.errorNotifier); // read all existing messages - this.setInitializePhase(true); while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) { this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext()); Review comment: it is kind of weird that the functionMetaDataTopicTailer.processRequest() will call back to FunctionMetadataManager. Seems like an awkward interaction between the classes. Perhaps we can refactor in a subsequent PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org