This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d0aac  Enable Function Workers to use exclusive producer to write to 
internal topics (#9275)
73d0aac is described below

commit 73d0aac75861da485a1bff2d678b5f94a4ab3a1b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Feb 17 21:51:52 2021 -0800

    Enable Function Workers to use exclusive producer to write to internal 
topics (#9275)
    
    Co-authored-by: Jerry Peng <[email protected]>
---
 .../functions/worker/FunctionMetaDataManager.java  |  49 +++++-----
 .../pulsar/functions/worker/LeaderService.java     |  29 +++++-
 .../functions/worker/PulsarWorkerService.java      |   1 +
 .../pulsar/functions/worker/SchedulerManager.java  |  76 +++++-----------
 .../pulsar/functions/worker/WorkerUtils.java       |  43 ++++++++-
 .../worker/FunctionMetaDataManagerTest.java        |  26 ++++--
 .../pulsar/functions/worker/LeaderServiceTest.java |  90 +++++++++++++++++-
 .../functions/worker/SchedulerManagerTest.java     |  15 +--
 .../pulsar/functions/worker/WorkerUtilsTest.java   | 101 +++++++++++++++++++++
 9 files changed, 334 insertions(+), 96 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 7416334..b5ed5eb 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
@@ -252,14 +254,36 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
     }
 
     /**
+     * Acquires a exclusive producer.  This method cannot return null.  It can 
only return a valid exclusive producer
+     * or throw NotLeaderAnymore exception.
+     * @param isLeader if the worker is still the leader
+     * @return A valid exclusive producer
+     * @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the 
leader.
+     */
+    public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader) 
throws WorkerUtils.NotLeaderAnymore {
+        // creates exclusive producer for metadata topic
+        return WorkerUtils.createExclusiveProducerWithRetry(
+                pulsarClient,
+                workerConfig.getFunctionMetadataTopic(),
+                workerConfig.getWorkerId() + "-leader",
+                isLeader, 1000);
+    }
+
+    /**
      * Called by the leader service when this worker becomes the leader.
      * We first get exclusive producer on the metadata topic. Next we drain 
the tailer
      * to ensure that we have caught up to metadata topic. After which we 
close the tailer.
      * Note that this method cannot be syncrhonized because the tailer might 
still be processing messages
      */
-    public void acquireLeadership() {
+    public void acquireLeadership(Producer<byte[]> exclusiveProducer) {
         log.info("FunctionMetaDataManager becoming leader by creating 
exclusive producer");
-        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        if (exclusiveLeaderProducer != null) {
+            log.error("FunctionMetaData Manager entered invalid state");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        this.exclusiveLeaderProducer = exclusiveProducer;
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
         // Now that we have created the exclusive producer, wait for reader to 
get over
         if (tailer != null) {
             try {
@@ -273,27 +297,6 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
         log.info("FunctionMetaDataManager done becoming leader");
     }
 
-    private synchronized FunctionMetaDataTopicTailer 
internalAcquireLeadership() {
-        if (exclusiveLeaderProducer == null) {
-            try {
-                exclusiveLeaderProducer = pulsarClient.newProducer()
-                        .topic(this.workerConfig.getFunctionMetadataTopic())
-                        .producerName(workerConfig.getWorkerId() + "-leader")
-                        // .type(EXCLUSIVE)
-                        .create();
-            } catch (PulsarClientException e) {
-                log.error("Error creating exclusive producer", e);
-                errorNotifier.triggerError(e);
-            }
-        } else {
-            log.error("Logic Error in FunctionMetaData Manager");
-            errorNotifier.triggerError(new IllegalStateException());
-        }
-        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
-        this.functionMetaDataTopicTailer = null;
-        return tailer;
-    }
-
     /**
      * called by the leader service when we lose leadership. We close the 
exclusive producer
      * and start the tailer.
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index b0d7585..fb11fab 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.functions.worker;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 
+import java.util.function.Supplier;
+
 @Slf4j
 public class LeaderService implements AutoCloseable, ConsumerEventListener {
 
@@ -35,6 +38,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
     private final SchedulerManager schedulerManager;
     private final FunctionRuntimeManager functionRuntimeManager;
     private final FunctionMetaDataManager functionMetaDataManager;
+    private final MembershipManager membershipManager;
     private ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
@@ -50,6 +54,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
                          SchedulerManager schedulerManager,
                          FunctionRuntimeManager functionRuntimeManager,
                          FunctionMetaDataManager functionMetaDataManager,
+                         MembershipManager membershipManager,
                          ErrorNotifier errorNotifier) {
         this.workerConfig = workerService.getWorkerConfig();
         this.pulsarClient = pulsarClient;
@@ -57,6 +62,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
         this.schedulerManager = schedulerManager;
         this.functionRuntimeManager = functionRuntimeManager;
         this.functionMetaDataManager = functionMetaDataManager;
+        this.membershipManager = membershipManager;
         this.errorNotifier = errorNotifier;
         consumerName = String.format(
                 "%s:%s:%d",
@@ -95,10 +101,29 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
                 functionMetaDataManager.getIsInitialized().get();
                 functionRuntimeManager.getIsInitialized().get();
 
+                // attempt to acquire exclusive publishers to both the 
metadata topic and assignments topic
+                // we should keep trying to acquire exclusive producers as 
long as we are still the leader
+                Supplier<Boolean> checkIsStillLeader = () -> 
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+                Producer<byte[]> scheduleManagerExclusiveProducer = null;
+                Producer<byte[]> functionMetaDataManagerExclusiveProducer = 
null;
+                try {
+                    scheduleManagerExclusiveProducer = 
schedulerManager.acquireExclusiveWrite(checkIsStillLeader);
+                    functionMetaDataManagerExclusiveProducer = 
functionMetaDataManager.acquireExclusiveWrite(checkIsStillLeader);
+                } catch (WorkerUtils.NotLeaderAnymore e) {
+                    log.info("Worker {} is not leader anymore. Exiting 
becoming leader routine.", consumer);
+                    if (scheduleManagerExclusiveProducer != null) {
+                        scheduleManagerExclusiveProducer.close();
+                    }
+                    if (functionMetaDataManagerExclusiveProducer != null) {
+                        functionMetaDataManagerExclusiveProducer.close();
+                    }
+                    return;
+                }
+
                 // make sure scheduler is initialized because this worker
                 // is the leader and may need to start computing and writing 
assignments
                 // also creates exclusive producer for assignment topic
-                schedulerManager.initialize();
+                schedulerManager.initialize(scheduleManagerExclusiveProducer);
 
                 // trigger read to the end of the topic and exit
                 // Since the leader can just update its in memory assignments 
cache directly
@@ -106,7 +131,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
                 functionAssignmentTailer.close();
 
                 // need to move function meta data manager into leader mode
-                functionMetaDataManager.acquireLeadership();
+                
functionMetaDataManager.acquireLeadership(functionMetaDataManagerExclusiveProducer);
 
                 isLeader = true;
             } catch (Throwable th) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index ff4c72d..bdf71e1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -497,6 +497,7 @@ public class PulsarWorkerService implements WorkerService {
               schedulerManager,
               functionRuntimeManager,
               functionMetaDataManager,
+              membershipManager,
               errorNotifier);
 
             log.info("/** Start Leader Service **/");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 5019982..cc3e371 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -39,9 +39,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Builder;
 import lombok.Data;
@@ -52,7 +52,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -64,7 +63,6 @@ import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
-import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
@@ -98,7 +96,7 @@ public class SchedulerManager implements AutoCloseable {
 
     private final IScheduler scheduler;
 
-    private Producer<byte[]> producer;
+    private Producer<byte[]> exclusiveProducer;
 
     private ScheduledExecutorService scheduledExecutorService;
 
@@ -135,55 +133,26 @@ public class SchedulerManager implements AutoCloseable {
         this.errorNotifier = errorNotifier;
     }
 
-    private static Producer<byte[]> createProducer(PulsarClient client, 
WorkerConfig config) {
-        Actions.Action createProducerAction = Actions.Action.builder()
-                .actionName(String.format("Creating producer for assignment 
topic %s", config.getFunctionAssignmentTopic()))
-                .numRetries(5)
-                .sleepBetweenInvocationsMs(10000)
-                .supplier(() -> {
-                    try {
-                        // TODO set producer to be in exclusive mode
-                        Producer<byte[]> producer = 
client.newProducer().topic(config.getFunctionAssignmentTopic())
-                                .enableBatching(false)
-                                .blockIfQueueFull(true)
-                                .compressionType(CompressionType.LZ4)
-                                .sendTimeout(0, TimeUnit.MILLISECONDS)
-                                .producerName(config.getWorkerId() + 
"-scheduler-manager")
-                                .createAsync().get(10, TimeUnit.SECONDS);
-                        return 
Actions.ActionResult.builder().success(true).result(producer).build();
-                    } catch (Exception e) {
-                        log.error("Exception while at creating producer to 
topic {}", config.getFunctionAssignmentTopic(), e);
-                        return Actions.ActionResult.builder()
-                                .success(false)
-                                .build();
-                    }
-                })
-                .build();
-        AtomicReference<Producer<byte[]>> producer = new AtomicReference<>();
-        try {
-            Actions.newBuilder()
-                    .addAction(createProducerAction.toBuilder()
-                            .onSuccess((actionResult) -> 
producer.set((Producer<byte[]>) actionResult.getResult()))
-                            .build())
-                    .run();
-        } catch (InterruptedException e) {
-            log.error("Interrupted at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        }
-        if (producer.get() == null) {
-            throw new RuntimeException("Can't create a producer on assignment 
topic "
-                    + config.getFunctionAssignmentTopic());
-        }
-        return producer.get();
+    /**
+     * Acquires a exclusive producer.  This method cannot return null.  It can 
only return a valid exclusive producer
+     * or throw NotLeaderAnymore exception.
+     * @param isLeader if the worker is still the leader
+     * @return A valid exclusive producer
+     * @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the 
leader.
+     */
+    public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader) 
throws WorkerUtils.NotLeaderAnymore {
+        // creates exclusive producer for assignment topic
+        return WorkerUtils.createExclusiveProducerWithRetry(
+                pulsarClient,
+                workerConfig.getFunctionAssignmentTopic(),
+                workerConfig.getWorkerId() + "-scheduler-manager",
+                isLeader, 10000);
     }
 
-    public synchronized void initialize() {
+    public synchronized void initialize(Producer<byte[]> exclusiveProducer) {
         if (!isRunning) {
             log.info("Initializing scheduler manager");
-            // creates exclusive producer for assignment topic
-            producer = createProducer(pulsarClient, workerConfig);
-
+            this.exclusiveProducer = exclusiveProducer;
             executorService = new ThreadPoolExecutor(1, 5, 0L, 
TimeUnit.MILLISECONDS,
                     new LinkedBlockingQueue<>(5));
             executorService.setThreadFactory(new 
ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
@@ -194,6 +163,9 @@ public class SchedulerManager implements AutoCloseable {
 
             isRunning = true;
             lastMessageProduced = null;
+        } else {
+            log.error("Scheduler Manager entered invalid state");
+            errorNotifier.triggerError(new IllegalStateException());
         }
     }
 
@@ -461,7 +433,7 @@ public class SchedulerManager implements AutoCloseable {
             String fullyQualifiedInstanceId = 
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
             // publish empty message with instance-id key so, compactor can 
delete and skip delivery of this instance-id
             // message
-            return producer.newMessage().key(fullyQualifiedInstanceId)
+            return exclusiveProducer.newMessage().key(fullyQualifiedInstanceId)
                     .value(deleted ? "".getBytes() : 
assignment.toByteArray()).send();
         } catch (Exception e) {
             log.error("Failed to {} assignment update {}", assignment, deleted 
? "send" : "deleted", e);
@@ -543,9 +515,9 @@ public class SchedulerManager implements AutoCloseable {
                 executorService.shutdown();
             }
 
-            if (producer != null) {
+            if (exclusiveProducer != null) {
                 try {
-                    producer.close();
+                    exclusiveProducer.close();
                 } catch (PulsarClientException e) {
                     log.warn("Failed to shutdown scheduler manager assignment 
producer", e);
                 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 6bd85b3..3a2a344 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -22,7 +22,6 @@ import static 
java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,14 +32,16 @@ import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -59,6 +60,8 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -327,4 +330,40 @@ public final class WorkerUtils {
                 .startMessageId(startMessageId)
                 .create();
     }
+
+    public static Producer<byte[]> 
createExclusiveProducerWithRetry(PulsarClient client,
+                                                                    String 
topic,
+                                                                    String 
producerName,
+                                                                    
Supplier<Boolean> isLeader,
+                                                                    int 
sleepInBetweenMs) throws NotLeaderAnymore {
+        try {
+            int tries = 0;
+            do {
+                try {
+                    return client.newProducer().topic(topic)
+                            .accessMode(ProducerAccessMode.Exclusive)
+                            .enableBatching(false)
+                            .blockIfQueueFull(true)
+                            .compressionType(CompressionType.LZ4)
+                            .producerName(producerName)
+                            .createAsync().get(10, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.info("Encountered exception while at creating 
exclusive producer to topic {}", topic, e);
+                }
+                tries++;
+                if (tries % 6 == 0) {
+                    log.warn("Failed to acquire exclusive producer to topic {} 
after {} attempts.  Will retry if we are still the leader.", topic, tries);
+                }
+                Thread.sleep(sleepInBetweenMs);
+            } while (isLeader.get());
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to create exclusive producer on 
topic " + topic, e);
+        }
+
+        throw new NotLeaderAnymore();
+    }
+
+    public static class NotLeaderAnymore extends Exception {
+
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 25e62f0..55f4222 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.functions.proto.Function;
@@ -41,6 +43,11 @@ public class FunctionMetaDataManagerTest {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
         when(builder.producerName(anyString())).thenReturn(builder);
+        when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+        when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+        
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+        when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
+        when(builder.accessMode(any())).thenReturn(builder);
 
         Producer producer = mock(Producer.class);
         TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
@@ -54,6 +61,7 @@ public class FunctionMetaDataManagerTest {
         when(producer.newMessage()).thenReturn(messageBuilder);
 
         when(builder.create()).thenReturn(producer);
+        
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
 
         PulsarClient client = mock(PulsarClient.class);
         when(client.newProducer()).thenReturn(builder);
@@ -100,16 +108,16 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
-    public void testUpdateIfLeaderFunctionWithoutCompaction() throws 
PulsarClientException {
+    public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception 
{
         testUpdateIfLeaderFunction(false);
     }
 
     @Test
-    public void testUpdateIfLeaderFunctionWithCompaction() throws 
PulsarClientException {
+    public void testUpdateIfLeaderFunctionWithCompaction() throws Exception {
         testUpdateIfLeaderFunction(true);
     }
 
-    private void testUpdateIfLeaderFunction(boolean compact) throws 
PulsarClientException {
+    private void testUpdateIfLeaderFunction(boolean compact) throws Exception {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
@@ -131,7 +139,8 @@ public class FunctionMetaDataManagerTest {
         }
 
         // become leader
-        functionMetaDataManager.acquireLeadership();
+        Producer<byte[]> exclusiveProducer = 
functionMetaDataManager.acquireExclusiveWrite(() -> true);
+        functionMetaDataManager.acquireLeadership(exclusiveProducer);
         // Now w should be able to really update
         functionMetaDataManager.updateFunctionOnLeader(m1, false);
         if (compact) {
@@ -158,16 +167,16 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
-    public void deregisterFunctionWithoutCompaction() throws 
PulsarClientException {
+    public void deregisterFunctionWithoutCompaction() throws Exception {
         deregisterFunction(false);
     }
 
     @Test
-    public void deregisterFunctionWithCompaction() throws 
PulsarClientException {
+    public void deregisterFunctionWithCompaction() throws Exception {
         deregisterFunction(true);
     }
 
-    private void deregisterFunction(boolean compact) throws 
PulsarClientException {
+    private void deregisterFunction(boolean compact) throws Exception {
         SchedulerManager mockedScheduler = mock(SchedulerManager.class);
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
@@ -190,7 +199,8 @@ public class FunctionMetaDataManagerTest {
         }
 
         // become leader
-        functionMetaDataManager.acquireLeadership();
+        Producer<byte[]> exclusiveProducer = 
functionMetaDataManager.acquireExclusiveWrite(() -> true);
+        functionMetaDataManager.acquireLeadership(exclusiveProducer);
         verify(mockedScheduler, times(0)).schedule();
         // Now try deleting
         functionMetaDataManager.updateFunctionOnLeader(m1, true);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 2bfaf81..57f2dca 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -61,6 +61,7 @@ public class LeaderServiceTest {
     private CompletableFuture metadataManagerInitFuture;
     private CompletableFuture runtimeManagerInitFuture;
     private CompletableFuture readToTheEndAndExitFuture;
+    private MembershipManager membershipManager;
 
     public LeaderServiceTest() {
         this.workerConfig = new WorkerConfig();
@@ -117,8 +118,10 @@ public class LeaderServiceTest {
         
when(functionMetadataManager.getIsInitialized()).thenReturn(metadataManagerInitFuture);
         
when(functionRuntimeManager.getIsInitialized()).thenReturn(runtimeManagerInitFuture);
 
+        membershipManager = mock(MembershipManager.class);
+
         leaderService = spy(new LeaderService(workerService, mockClient, 
functionAssignmentTailer, schedulerManager,
-          functionRuntimeManager, functionMetadataManager,  
ErrorNotifier.getDefaultImpl()));
+          functionRuntimeManager, functionMetadataManager,  membershipManager, 
ErrorNotifier.getDefaultImpl()));
         leaderService.start();
     }
 
@@ -138,15 +141,19 @@ public class LeaderServiceTest {
         verify(functionRuntimeManager, times(1)).getIsInitialized();
         verify(runtimeManagerInitFuture, times(1)).get();
 
+        verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+        verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+
         verify(functionAssignmentTailer, 
times(1)).triggerReadToTheEndAndExit();
         verify(functionAssignmentTailer, times(1)).close();
-        verify(schedulerManager, times((1))).initialize();
+        verify(schedulerManager, times((1))).initialize(any());
 
         listenerHolder.get().becameInactive(mockConsumer, 0);
         assertFalse(leaderService.isLeader());
 
         verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
         verify(schedulerManager, times(1)).close();
+        verify(functionMetadataManager, times(1)).giveupLeadership();
     }
 
     @Test
@@ -159,15 +166,92 @@ public class LeaderServiceTest {
         listenerHolder.get().becameActive(mockConsumer, 0);
         assertTrue(leaderService.isLeader());
 
+        verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+        verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+
         verify(functionAssignmentTailer, 
times(1)).triggerReadToTheEndAndExit();
         verify(readToTheEndAndExitFuture, times(1)).get();
         verify(functionAssignmentTailer, times(1)).close();
-        verify(schedulerManager, times((1))).initialize();
+        verify(schedulerManager, times((1))).initialize(any());
 
         listenerHolder.get().becameInactive(mockConsumer, 0);
         assertFalse(leaderService.isLeader());
 
         verify(functionAssignmentTailer, times(1)).start();
         verify(schedulerManager, times(1)).close();
+        verify(functionMetadataManager, times(1)).giveupLeadership();
+    }
+
+    @Test
+    public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() 
throws Exception {
+        MessageId messageId = new MessageIdImpl(1, 2, -1);
+        when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+        assertFalse(leaderService.isLeader());
+        verify(mockClient, times(1)).newConsumer();
+
+        // acquire exclusive producer failed because no leader anymore
+        when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new 
WorkerUtils.NotLeaderAnymore());
+
+        listenerHolder.get().becameActive(mockConsumer, 0);
+        // should have failed to become leader
+        assertFalse(leaderService.isLeader());
+
+        verify(functionMetadataManager, times(1)).getIsInitialized();
+        verify(metadataManagerInitFuture, times(1)).get();
+        verify(functionRuntimeManager, times(1)).getIsInitialized();
+        verify(runtimeManagerInitFuture, times(1)).get();
+
+        verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+        verify(functionMetadataManager, times(0)).acquireExclusiveWrite(any());
+
+        verify(functionAssignmentTailer, 
times(0)).triggerReadToTheEndAndExit();
+        verify(functionAssignmentTailer, times(0)).close();
+        verify(schedulerManager, times((0))).initialize(any());
+
+        listenerHolder.get().becameInactive(mockConsumer, 0);
+        assertFalse(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
+        verify(functionAssignmentTailer, times(0)).start();
+        verify(schedulerManager, times(0)).close();
+        verify(functionMetadataManager, times(0)).giveupLeadership();
     }
+
+    @Test
+    public void 
testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore() throws 
Exception {
+        MessageId messageId = new MessageIdImpl(1, 2, -1);
+        when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+        assertFalse(leaderService.isLeader());
+        verify(mockClient, times(1)).newConsumer();
+
+        // acquire exclusive producer failed because no leader anymore
+        
when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new 
WorkerUtils.NotLeaderAnymore());
+
+        listenerHolder.get().becameActive(mockConsumer, 0);
+        // should have failed to become leader
+        assertFalse(leaderService.isLeader());
+
+        verify(functionMetadataManager, times(1)).getIsInitialized();
+        verify(metadataManagerInitFuture, times(1)).get();
+        verify(functionRuntimeManager, times(1)).getIsInitialized();
+        verify(runtimeManagerInitFuture, times(1)).get();
+
+        verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+        verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+
+        verify(functionAssignmentTailer, 
times(0)).triggerReadToTheEndAndExit();
+        verify(functionAssignmentTailer, times(0)).close();
+        verify(schedulerManager, times((0))).initialize(any());
+
+        listenerHolder.get().becameInactive(mockConsumer, 0);
+        assertFalse(leaderService.isLeader());
+
+        verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
+        verify(functionAssignmentTailer, times(0)).start();
+        verify(schedulerManager, times(0)).close();
+        verify(functionMetadataManager, times(0)).giveupLeadership();
+    }
+
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 2a6278e..19e27e7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import io.prometheus.client.CollectorRegistry;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -61,7 +60,6 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
@@ -84,6 +82,7 @@ public class SchedulerManagerTest {
     private ScheduledExecutorService executor;
     private LeaderService leaderService;
     private ErrorNotifier errorNotifier;
+    private PulsarClient pulsarClient;
 
     @BeforeMethod
     public void setup() {
@@ -116,10 +115,11 @@ public class SchedulerManagerTest {
         when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
         
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
         when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
+        when(builder.accessMode(any())).thenReturn(builder);
 
         
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        pulsarClient = mock(PulsarClient.class);
         when(pulsarClient.newProducer()).thenReturn(builder);
 
         this.executor = Executors
@@ -820,7 +820,7 @@ public class SchedulerManagerTest {
     }
 
     @Test
-    public void testAssignmentWorkerDoesNotExist() throws 
InterruptedException, NoSuchMethodException, TimeoutException, 
ExecutionException {
+    public void testAssignmentWorkerDoesNotExist() throws Exception {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -879,9 +879,12 @@ public class SchedulerManagerTest {
     }
 
     private void callSchedule() throws InterruptedException,
-            TimeoutException, ExecutionException {
+            TimeoutException, ExecutionException, WorkerUtils.NotLeaderAnymore 
{
 
-        schedulerManager.initialize();
+        if (leaderService.isLeader()) {
+            Producer<byte[]> exclusiveProducer = 
schedulerManager.acquireExclusiveWrite(() -> true);
+            schedulerManager.initialize(exclusiveProducer);
+        }
         Future<?> complete = schedulerManager.schedule();
 
         complete.get(30, TimeUnit.SECONDS);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
new file mode 100644
index 0000000..f667059
--- /dev/null
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class WorkerUtilsTest {
+
+    @Test
+    public void testCreateExclusiveProducerWithRetry() {
+        Producer<byte[]> producer = mock(Producer.class);
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
+        when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+        when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+        
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+        when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
+        when(builder.accessMode(any())).thenReturn(builder);
+        
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+        when(pulsarClient.newProducer()).thenReturn(builder);
+
+        Producer<byte[]> p = null;
+        try {
+            p = WorkerUtils.createExclusiveProducerWithRetry(pulsarClient, 
"test-topic", "test-producer", () -> true, 0);
+        } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+            Assert.fail();
+        }
+        Assert.assertNotNull(p);
+        verify(builder, times(1)).topic(eq("test-topic"));
+        verify(builder, times(1)).producerName(eq("test-producer"));
+        verify(builder, times(1)).accessMode(eq(ProducerAccessMode.Exclusive));
+
+        CompletableFuture completableFuture = new CompletableFuture();
+        completableFuture.completeExceptionally(new 
PulsarClientException.ProducerFencedException("test"));
+        when(builder.createAsync()).thenReturn(completableFuture);
+        try {
+            WorkerUtils.createExclusiveProducerWithRetry(pulsarClient, 
"test-topic", "test-producer", () -> false, 0);
+            Assert.fail();
+        } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+
+        }
+
+        AtomicInteger i = new AtomicInteger();
+        try {
+            WorkerUtils.createExclusiveProducerWithRetry(pulsarClient, 
"test-topic", "test-producer", new Supplier<Boolean>() {
+
+                @Override
+                public Boolean get() {
+                    if (i.getAndIncrement() < 6) {
+                        return true;
+                    }
+                    return false;
+                }
+            }, 0);
+            Assert.fail();
+        } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+
+        }
+    }
+}
\ No newline at end of file

Reply via email to