srkukarni commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442641790
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+/**
+ * This class is responsible for reading assignments from the 'assignments'
functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read
from the topic.
+ * When a worker become a leader, the worker will read to the end of the
assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new
assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can
just update its in memory assignments map directly
+ * after it computes a new scheduling. When a worker loses leadership, the
worker is start reading from the assignments topic again.
+ */
@Slf4j
public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- @Getter
- private final Reader<byte[]> reader;
+ private final ReaderBuilder readerBuilder;
+ private final WorkerConfig workerConfig;
+ private final ErrorNotifier errorNotifier;
+ private Reader<byte[]> reader;
private volatile boolean isRunning = false;
+ private volatile boolean exitOnEndOfTopic = false;
+ private CompletableFuture<Void> hasExited;
+ private Thread tailerThread;
- private final Thread tailerThread;
+ @Getter
+ private MessageId lastMessageId = null;
Review comment:
Shouldn' t we init this to MessageId.earliest?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]