srkukarni commented on a change in pull request #7211:
URL: https://github.com/apache/pulsar/pull/7211#discussion_r437601460
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
##########
@@ -19,47 +19,84 @@
package org.apache.pulsar.functions.worker;
import java.io.IOException;
-import java.util.function.Function;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
@Slf4j
public class FunctionMetaDataTopicTailer
- implements java.util.function.Consumer<Message<byte[]>>,
Function<Throwable, Void>, AutoCloseable {
+ implements Runnable, AutoCloseable {
private final FunctionMetaDataManager functionMetaDataManager;
+ @Getter
private final Reader<byte[]> reader;
+ private final Thread readerThread;
+ private volatile boolean running;
+ private ErrorNotifier errorNotifier;
public FunctionMetaDataTopicTailer(FunctionMetaDataManager
functionMetaDataManager,
- Reader<byte[]> reader)
+ ReaderBuilder readerBuilder,
WorkerConfig workerConfig,
+ ErrorNotifier errorNotifier)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
- this.reader = reader;
+ this.reader = readerBuilder
+ .topic(workerConfig.getFunctionMetadataTopic())
Review comment:
Added
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]