jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442662116



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' 
functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read 
from the topic.
+ * When a worker become a leader, the worker will read to the end of the 
assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new 
assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can 
just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the 
worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to