jerrypeng commented on a change in pull request #7237:
URL: https://github.com/apache/pulsar/pull/7237#discussion_r442659613
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+/**
+ * This class is responsible for reading assignments from the 'assignments'
functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read
from the topic.
+ * When a worker become a leader, the worker will read to the end of the
assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new
assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can
just update its in memory assignments map directly
+ * after it computes a new scheduling. When a worker loses leadership, the
worker is start reading from the assignments topic again.
+ */
@Slf4j
public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- @Getter
- private final Reader<byte[]> reader;
+ private final ReaderBuilder readerBuilder;
+ private final WorkerConfig workerConfig;
+ private final ErrorNotifier errorNotifier;
+ private Reader<byte[]> reader;
private volatile boolean isRunning = false;
+ private volatile boolean exitOnEndOfTopic = false;
+ private CompletableFuture<Void> hasExited;
+ private Thread tailerThread;
- private final Thread tailerThread;
+ @Getter
+ private MessageId lastMessageId = null;
public FunctionAssignmentTailer(
FunctionRuntimeManager functionRuntimeManager,
ReaderBuilder readerBuilder,
WorkerConfig workerConfig,
- ErrorNotifier errorNotifier) throws PulsarClientException {
+ ErrorNotifier errorNotifier) {
this.functionRuntimeManager = functionRuntimeManager;
-
- this.reader = readerBuilder
- .subscriptionRolePrefix(workerConfig.getWorkerId() +
"-function-runtime-manager")
- .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
- .topic(workerConfig.getFunctionAssignmentTopic())
- .readCompacted(true)
- .startMessageId(MessageId.earliest)
- .create();
-
- this.tailerThread = new Thread(() -> {
- while(isRunning) {
- try {
- Message<byte[]> msg = reader.readNext();
- processAssignment(msg);
- } catch (Throwable th) {
- if (isRunning) {
- log.error("Encountered error in assignment tailer",
th);
- // trigger fatal error
- isRunning = false;
- errorNotifier.triggerError(th);
- } else {
- if (!(th instanceof InterruptedException ||
th.getCause() instanceof InterruptedException)) {
- log.warn("Encountered error when assignment tailer
is not running", th);
- }
- }
+ this.hasExited = new CompletableFuture<>();
+ this.readerBuilder = readerBuilder;
+ this.workerConfig = workerConfig;
+ this.errorNotifier = errorNotifier;
+ }
- }
+ public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
+ exitOnEndOfTopic = true;
+ return this.hasExited;
+ }
+
+ public void startFromMessage(MessageId startMessageId) throws
PulsarClientException {
+ log.info("Function assignment tailer start reading from topic {} at
{}",
+ workerConfig.getFunctionAssignmentTopic(), startMessageId);
+ if (!isRunning) {
+ isRunning = true;
+ if (reader == null) {
+ reader = createReader(startMessageId);
}
- });
- this.tailerThread.setName("assignment-tailer-thread");
+ if (tailerThread == null || !tailerThread.isAlive()) {
+ tailerThread = getTailerThread();
+ }
+ hasExited = new CompletableFuture<>();
+ tailerThread.start();
+ }
}
- public void start() {
- isRunning = true;
- tailerThread.start();
+ public synchronized void start() throws PulsarClientException {
Review comment:
> I also think that some logic will be simpler if we create Tailer
object every time we go thru leadership transition
That is not correct. The functionAssignmentTailer is also responsible for
keeping track of a message id. If a worker becomes a leader and then loses
leadership prior to creating any assignments, we shouldn't just start reading
the assignment topic from the beginning. We should resume from the message id
stored in the functionAssignmentTailer
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
##########
@@ -25,95 +25,157 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function.Assignment;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+/**
+ * This class is responsible for reading assignments from the 'assignments'
functions internal topic.
+ * Only functions worker leader writes to the topic while other workers read
from the topic.
+ * When a worker become a leader, the worker will read to the end of the
assignments topic and close its reader to the topic.
+ * Then the worker and new leader will be in charge of computing new
assignments when necessary.
+ * The leader does not need to listen to the assignments topic because it can
just update its in memory assignments map directly
+ * after it computes a new scheduling. When a worker loses leadership, the
worker is start reading from the assignments topic again.
+ */
@Slf4j
public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- @Getter
- private final Reader<byte[]> reader;
+ private final ReaderBuilder readerBuilder;
+ private final WorkerConfig workerConfig;
+ private final ErrorNotifier errorNotifier;
+ private Reader<byte[]> reader;
private volatile boolean isRunning = false;
+ private volatile boolean exitOnEndOfTopic = false;
+ private CompletableFuture<Void> hasExited;
+ private Thread tailerThread;
- private final Thread tailerThread;
+ @Getter
+ private MessageId lastMessageId = null;
Review comment:
No
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]