jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444431425



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
                 this.pulsarClient, 
this.workerConfig.getFunctionMetadataTopic());
         this.schedulerManager = schedulerManager;
         this.errorNotifier = errorNotifier;
+        exclusiveLeaderProducer = null;
     }
 
     /**
      * Public methods. Please use these methods if references 
FunctionMetaManager from an external class
      */
 
     /**
-     * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Consume all existing function meta data upon start to establish 
existing state
+     * Initializes the FunctionMetaDataManager. By default we start in the 
worker mode.
+     * We consume all existing function meta data to establish existing state
      */
     public void initialize() {
         try {
+            initializeTailer();
             this.functionMetaDataTopicTailer = new 
FunctionMetaDataTopicTailer(this,
                     pulsarClient.newReader(), this.workerConfig, 
this.errorNotifier);
             // read all existing messages
-            this.setInitializePhase(true);
             while 
(this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                 
this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
             }
-            this.setInitializePhase(false);
             
             this.isInitialized.complete(null);
         } catch (Exception e) {
             log.error("Failed to initialize meta data store", e);
-            throw new RuntimeException(e);
+            errorNotifier.triggerError(e);
         }
     }
-    
+
+    private void initializeTailer() throws PulsarClientException {
+        this.functionMetaDataTopicTailer = new 
FunctionMetaDataTopicTailer(this,
+                pulsarClient.newReader().startMessageId(lastMessageSeen), 
this.workerConfig, this.errorNotifier);
+    }
+
     public void start() {
-        // schedule functions if necessary
-        this.schedulerManager.schedule();

Review comment:
       Why are we removing this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to