jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442663082



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This class is responsible for reading assignments from the 'assignments' 
functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read 
from the topic.
+ * When a worker become a leader, the worker will read to the end of the 
assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new 
assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can 
just update its in memory assignments map directly
+ * after it computes a new scheduling.  When a worker loses leadership, the 
worker is start reading from the assignments topic again.
+ */
 @Slf4j
 public class FunctionAssignmentTailer implements AutoCloseable {
 
     private final FunctionRuntimeManager functionRuntimeManager;
-    @Getter
-    private final Reader<byte[]> reader;
+    private final ReaderBuilder readerBuilder;
+    private final WorkerConfig workerConfig;
+    private final ErrorNotifier errorNotifier;
+    private Reader<byte[]> reader;
     private volatile boolean isRunning = false;
+    private volatile boolean exitOnEndOfTopic = false;
+    private CompletableFuture<Void> hasExited;
+    private Thread tailerThread;
 
-    private final Thread tailerThread;
+    @Getter
+    private MessageId lastMessageId = null;
     
     public FunctionAssignmentTailer(
             FunctionRuntimeManager functionRuntimeManager,
             ReaderBuilder readerBuilder,
             WorkerConfig workerConfig,
-            ErrorNotifier errorNotifier) throws PulsarClientException {
+            ErrorNotifier errorNotifier) {
         this.functionRuntimeManager = functionRuntimeManager;
-        
-        this.reader = readerBuilder
-          .subscriptionRolePrefix(workerConfig.getWorkerId() + 
"-function-runtime-manager")
-          .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
-          .topic(workerConfig.getFunctionAssignmentTopic())
-          .readCompacted(true)
-          .startMessageId(MessageId.earliest)
-          .create();
-        
-        this.tailerThread = new Thread(() -> {
-            while(isRunning) {
-                try {
-                    Message<byte[]> msg = reader.readNext();
-                    processAssignment(msg);
-                } catch (Throwable th) {
-                    if (isRunning) {
-                        log.error("Encountered error in assignment tailer", 
th);
-                        // trigger fatal error
-                        isRunning = false;
-                        errorNotifier.triggerError(th);
-                    } else {
-                        if (!(th instanceof InterruptedException || 
th.getCause() instanceof InterruptedException)) {
-                            log.warn("Encountered error when assignment tailer 
is not running", th);
-                        }
-                    }
+        this.hasExited = new CompletableFuture<>();
+        this.readerBuilder = readerBuilder;
+        this.workerConfig = workerConfig;
+        this.errorNotifier = errorNotifier;
+    }
 
-                }
+    public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+        exitOnEndOfTopic = true;
+        return this.hasExited;
+    }
+
+    public void startFromMessage(MessageId startMessageId) throws 
PulsarClientException {
+        log.info("Function assignment tailer start reading from topic {} at 
{}",
+                workerConfig.getFunctionAssignmentTopic(), startMessageId);
+        if (!isRunning) {
+            isRunning = true;
+            if (reader == null) {
+                reader = createReader(startMessageId);
             }
-        });
-        this.tailerThread.setName("assignment-tailer-thread");
+            if (tailerThread == null || !tailerThread.isAlive()) {
+                tailerThread = getTailerThread();
+            }
+            hasExited = new CompletableFuture<>();
+            tailerThread.start();
+        }
     }
 
-    public void start() {
-        isRunning = true;
-        tailerThread.start();
+    public synchronized void start() throws PulsarClientException {
+        MessageId startMessageId = lastMessageId == null ? MessageId.earliest 
: lastMessageId;
+        startFromMessage(startMessageId);
     }
 
     @Override
-    public void close() {
-        log.info("Stopping function assignment tailer");
+    public synchronized void close() {
+        log.info("Closing function assignment tailer");
         try {
             isRunning = false;
-            if (tailerThread != null && tailerThread.isAlive()) {
-                tailerThread.interrupt();
+
+            if (tailerThread != null) {
+                while (true) {
+                    tailerThread.interrupt();
+
+                    try {
+                        tailerThread.join(5000, 0);
+                    } catch (InterruptedException e) {
+                        log.warn("Waiting for assignment tailer thread to stop 
is interrupted", e);
+                    }
+
+                    if (tailerThread.isAlive()) {
+                        log.warn("Assignment tailer thread is still alive.  
Will attempt to interrupt again.");
+                    } else {
+                        break;
+                    }
+                }
+                tailerThread = null;
             }
             if (reader != null) {
                 reader.close();
+                reader = null;
             }
+
+            hasExited = null;
+            exitOnEndOfTopic = false;
+            
         } catch (IOException e) {
             log.error("Failed to stop function assignment tailer", e);
         }
-        log.info("Stopped function assignment tailer");
+    }
+    
+    private Reader<byte[]> createReader(MessageId startMessageId) throws 
PulsarClientException {
+        log.info("Assignment tailer will start reading from message id {}", 
startMessageId);
+
+        return readerBuilder
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + 
"-function-assignment-tailer")
+                .readerName(workerConfig.getWorkerId() + 
"-function-assignment-tailer")
+                .topic(workerConfig.getFunctionAssignmentTopic())
+                .readCompacted(true)
+                .startMessageId(startMessageId)
+                .create();
     }
 
-    public void processAssignment(Message<byte[]> msg) {
-
-        if(msg.getData()==null || (msg.getData().length==0)) {
-            log.info("Received assignment delete: {}", msg.getKey());
-            this.functionRuntimeManager.deleteAssignment(msg.getKey());
-        } else {
-            Assignment assignment;
-            try {
-                assignment = Assignment.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(), e);
-                throw new RuntimeException(e);
+    private Thread getTailerThread() {
+        Thread t = new Thread(() -> {
+            while (isRunning) {
+                try {
+                    Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        if (exitOnEndOfTopic && !reader.hasMessageAvailable()) 
{
+                            break;
+                        }

Review comment:
       It's safer to wait for a timeout period to make sure no messages just 
arrived late




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to