jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r444431425
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -80,37 +84,40 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
this.pulsarClient,
this.workerConfig.getFunctionMetadataTopic());
this.schedulerManager = schedulerManager;
this.errorNotifier = errorNotifier;
+ exclusiveLeaderProducer = null;
}
/**
* Public methods. Please use these methods if references
FunctionMetaManager from an external class
*/
/**
- * Initializes the FunctionMetaDataManager. Does the following:
- * 1. Consume all existing function meta data upon start to establish
existing state
+ * Initializes the FunctionMetaDataManager. By default we start in the
worker mode.
+ * We consume all existing function meta data to establish existing state
*/
public void initialize() {
try {
+ initializeTailer();
this.functionMetaDataTopicTailer = new
FunctionMetaDataTopicTailer(this,
pulsarClient.newReader(), this.workerConfig,
this.errorNotifier);
// read all existing messages
- this.setInitializePhase(true);
while
(this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
}
- this.setInitializePhase(false);
this.isInitialized.complete(null);
} catch (Exception e) {
log.error("Failed to initialize meta data store", e);
- throw new RuntimeException(e);
+ errorNotifier.triggerError(e);
}
}
-
+
+ private void initializeTailer() throws PulsarClientException {
+ this.functionMetaDataTopicTailer = new
FunctionMetaDataTopicTailer(this,
+ pulsarClient.newReader().startMessageId(lastMessageSeen),
this.workerConfig, this.errorNotifier);
+ }
+
public void start() {
- // schedule functions if necessary
- this.schedulerManager.schedule();
Review comment:
Why are we removing this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]