This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 73d0aac Enable Function Workers to use exclusive producer to write to
internal topics (#9275)
73d0aac is described below
commit 73d0aac75861da485a1bff2d678b5f94a4ab3a1b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Feb 17 21:51:52 2021 -0800
Enable Function Workers to use exclusive producer to write to internal
topics (#9275)
Co-authored-by: Jerry Peng <[email protected]>
---
.../functions/worker/FunctionMetaDataManager.java | 49 +++++-----
.../pulsar/functions/worker/LeaderService.java | 29 +++++-
.../functions/worker/PulsarWorkerService.java | 1 +
.../pulsar/functions/worker/SchedulerManager.java | 76 +++++-----------
.../pulsar/functions/worker/WorkerUtils.java | 43 ++++++++-
.../worker/FunctionMetaDataManagerTest.java | 26 ++++--
.../pulsar/functions/worker/LeaderServiceTest.java | 90 +++++++++++++++++-
.../functions/worker/SchedulerManagerTest.java | 15 +--
.../pulsar/functions/worker/WorkerUtilsTest.java | 101 +++++++++++++++++++++
9 files changed, 334 insertions(+), 96 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 7416334..b5ed5eb 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
@@ -252,14 +254,36 @@ public class FunctionMetaDataManager implements
AutoCloseable {
}
/**
+ * Acquires a exclusive producer. This method cannot return null. It can
only return a valid exclusive producer
+ * or throw NotLeaderAnymore exception.
+ * @param isLeader if the worker is still the leader
+ * @return A valid exclusive producer
+ * @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the
leader.
+ */
+ public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader)
throws WorkerUtils.NotLeaderAnymore {
+ // creates exclusive producer for metadata topic
+ return WorkerUtils.createExclusiveProducerWithRetry(
+ pulsarClient,
+ workerConfig.getFunctionMetadataTopic(),
+ workerConfig.getWorkerId() + "-leader",
+ isLeader, 1000);
+ }
+
+ /**
* Called by the leader service when this worker becomes the leader.
* We first get exclusive producer on the metadata topic. Next we drain
the tailer
* to ensure that we have caught up to metadata topic. After which we
close the tailer.
* Note that this method cannot be syncrhonized because the tailer might
still be processing messages
*/
- public void acquireLeadership() {
+ public void acquireLeadership(Producer<byte[]> exclusiveProducer) {
log.info("FunctionMetaDataManager becoming leader by creating
exclusive producer");
- FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+ if (exclusiveLeaderProducer != null) {
+ log.error("FunctionMetaData Manager entered invalid state");
+ errorNotifier.triggerError(new IllegalStateException());
+ }
+ this.exclusiveLeaderProducer = exclusiveProducer;
+ FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+ this.functionMetaDataTopicTailer = null;
// Now that we have created the exclusive producer, wait for reader to
get over
if (tailer != null) {
try {
@@ -273,27 +297,6 @@ public class FunctionMetaDataManager implements
AutoCloseable {
log.info("FunctionMetaDataManager done becoming leader");
}
- private synchronized FunctionMetaDataTopicTailer
internalAcquireLeadership() {
- if (exclusiveLeaderProducer == null) {
- try {
- exclusiveLeaderProducer = pulsarClient.newProducer()
- .topic(this.workerConfig.getFunctionMetadataTopic())
- .producerName(workerConfig.getWorkerId() + "-leader")
- // .type(EXCLUSIVE)
- .create();
- } catch (PulsarClientException e) {
- log.error("Error creating exclusive producer", e);
- errorNotifier.triggerError(e);
- }
- } else {
- log.error("Logic Error in FunctionMetaData Manager");
- errorNotifier.triggerError(new IllegalStateException());
- }
- FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
- this.functionMetaDataTopicTailer = null;
- return tailer;
- }
-
/**
* called by the leader service when we lose leadership. We close the
exclusive producer
* and start the tailer.
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index b0d7585..fb11fab 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.functions.worker;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import java.util.function.Supplier;
+
@Slf4j
public class LeaderService implements AutoCloseable, ConsumerEventListener {
@@ -35,6 +38,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
private final SchedulerManager schedulerManager;
private final FunctionRuntimeManager functionRuntimeManager;
private final FunctionMetaDataManager functionMetaDataManager;
+ private final MembershipManager membershipManager;
private ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
@@ -50,6 +54,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
SchedulerManager schedulerManager,
FunctionRuntimeManager functionRuntimeManager,
FunctionMetaDataManager functionMetaDataManager,
+ MembershipManager membershipManager,
ErrorNotifier errorNotifier) {
this.workerConfig = workerService.getWorkerConfig();
this.pulsarClient = pulsarClient;
@@ -57,6 +62,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
this.schedulerManager = schedulerManager;
this.functionRuntimeManager = functionRuntimeManager;
this.functionMetaDataManager = functionMetaDataManager;
+ this.membershipManager = membershipManager;
this.errorNotifier = errorNotifier;
consumerName = String.format(
"%s:%s:%d",
@@ -95,10 +101,29 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
functionMetaDataManager.getIsInitialized().get();
functionRuntimeManager.getIsInitialized().get();
+ // attempt to acquire exclusive publishers to both the
metadata topic and assignments topic
+ // we should keep trying to acquire exclusive producers as
long as we are still the leader
+ Supplier<Boolean> checkIsStillLeader = () ->
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+ Producer<byte[]> scheduleManagerExclusiveProducer = null;
+ Producer<byte[]> functionMetaDataManagerExclusiveProducer =
null;
+ try {
+ scheduleManagerExclusiveProducer =
schedulerManager.acquireExclusiveWrite(checkIsStillLeader);
+ functionMetaDataManagerExclusiveProducer =
functionMetaDataManager.acquireExclusiveWrite(checkIsStillLeader);
+ } catch (WorkerUtils.NotLeaderAnymore e) {
+ log.info("Worker {} is not leader anymore. Exiting
becoming leader routine.", consumer);
+ if (scheduleManagerExclusiveProducer != null) {
+ scheduleManagerExclusiveProducer.close();
+ }
+ if (functionMetaDataManagerExclusiveProducer != null) {
+ functionMetaDataManagerExclusiveProducer.close();
+ }
+ return;
+ }
+
// make sure scheduler is initialized because this worker
// is the leader and may need to start computing and writing
assignments
// also creates exclusive producer for assignment topic
- schedulerManager.initialize();
+ schedulerManager.initialize(scheduleManagerExclusiveProducer);
// trigger read to the end of the topic and exit
// Since the leader can just update its in memory assignments
cache directly
@@ -106,7 +131,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
functionAssignmentTailer.close();
// need to move function meta data manager into leader mode
- functionMetaDataManager.acquireLeadership();
+
functionMetaDataManager.acquireLeadership(functionMetaDataManagerExclusiveProducer);
isLeader = true;
} catch (Throwable th) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index ff4c72d..bdf71e1 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -497,6 +497,7 @@ public class PulsarWorkerService implements WorkerService {
schedulerManager,
functionRuntimeManager,
functionMetaDataManager,
+ membershipManager,
errorNotifier);
log.info("/** Start Leader Service **/");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 5019982..cc3e371 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -39,9 +39,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Data;
@@ -52,7 +52,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -64,7 +63,6 @@ import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
-import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
@@ -98,7 +96,7 @@ public class SchedulerManager implements AutoCloseable {
private final IScheduler scheduler;
- private Producer<byte[]> producer;
+ private Producer<byte[]> exclusiveProducer;
private ScheduledExecutorService scheduledExecutorService;
@@ -135,55 +133,26 @@ public class SchedulerManager implements AutoCloseable {
this.errorNotifier = errorNotifier;
}
- private static Producer<byte[]> createProducer(PulsarClient client,
WorkerConfig config) {
- Actions.Action createProducerAction = Actions.Action.builder()
- .actionName(String.format("Creating producer for assignment
topic %s", config.getFunctionAssignmentTopic()))
- .numRetries(5)
- .sleepBetweenInvocationsMs(10000)
- .supplier(() -> {
- try {
- // TODO set producer to be in exclusive mode
- Producer<byte[]> producer =
client.newProducer().topic(config.getFunctionAssignmentTopic())
- .enableBatching(false)
- .blockIfQueueFull(true)
- .compressionType(CompressionType.LZ4)
- .sendTimeout(0, TimeUnit.MILLISECONDS)
- .producerName(config.getWorkerId() +
"-scheduler-manager")
- .createAsync().get(10, TimeUnit.SECONDS);
- return
Actions.ActionResult.builder().success(true).result(producer).build();
- } catch (Exception e) {
- log.error("Exception while at creating producer to
topic {}", config.getFunctionAssignmentTopic(), e);
- return Actions.ActionResult.builder()
- .success(false)
- .build();
- }
- })
- .build();
- AtomicReference<Producer<byte[]>> producer = new AtomicReference<>();
- try {
- Actions.newBuilder()
- .addAction(createProducerAction.toBuilder()
- .onSuccess((actionResult) ->
producer.set((Producer<byte[]>) actionResult.getResult()))
- .build())
- .run();
- } catch (InterruptedException e) {
- log.error("Interrupted at creating producer to topic {}",
config.getFunctionAssignmentTopic(), e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- if (producer.get() == null) {
- throw new RuntimeException("Can't create a producer on assignment
topic "
- + config.getFunctionAssignmentTopic());
- }
- return producer.get();
+ /**
+ * Acquires a exclusive producer. This method cannot return null. It can
only return a valid exclusive producer
+ * or throw NotLeaderAnymore exception.
+ * @param isLeader if the worker is still the leader
+ * @return A valid exclusive producer
+ * @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the
leader.
+ */
+ public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader)
throws WorkerUtils.NotLeaderAnymore {
+ // creates exclusive producer for assignment topic
+ return WorkerUtils.createExclusiveProducerWithRetry(
+ pulsarClient,
+ workerConfig.getFunctionAssignmentTopic(),
+ workerConfig.getWorkerId() + "-scheduler-manager",
+ isLeader, 10000);
}
- public synchronized void initialize() {
+ public synchronized void initialize(Producer<byte[]> exclusiveProducer) {
if (!isRunning) {
log.info("Initializing scheduler manager");
- // creates exclusive producer for assignment topic
- producer = createProducer(pulsarClient, workerConfig);
-
+ this.exclusiveProducer = exclusiveProducer;
executorService = new ThreadPoolExecutor(1, 5, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(5));
executorService.setThreadFactory(new
ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
@@ -194,6 +163,9 @@ public class SchedulerManager implements AutoCloseable {
isRunning = true;
lastMessageProduced = null;
+ } else {
+ log.error("Scheduler Manager entered invalid state");
+ errorNotifier.triggerError(new IllegalStateException());
}
}
@@ -461,7 +433,7 @@ public class SchedulerManager implements AutoCloseable {
String fullyQualifiedInstanceId =
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
// publish empty message with instance-id key so, compactor can
delete and skip delivery of this instance-id
// message
- return producer.newMessage().key(fullyQualifiedInstanceId)
+ return exclusiveProducer.newMessage().key(fullyQualifiedInstanceId)
.value(deleted ? "".getBytes() :
assignment.toByteArray()).send();
} catch (Exception e) {
log.error("Failed to {} assignment update {}", assignment, deleted
? "send" : "deleted", e);
@@ -543,9 +515,9 @@ public class SchedulerManager implements AutoCloseable {
executorService.shutdown();
}
- if (producer != null) {
+ if (exclusiveProducer != null) {
try {
- producer.close();
+ exclusiveProducer.close();
} catch (PulsarClientException e) {
log.warn("Failed to shutdown scheduler manager assignment
producer", e);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 6bd85b3..3a2a344 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -22,7 +22,6 @@ import static
java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,14 +32,16 @@ import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -59,6 +60,8 @@ import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
@Slf4j
@@ -327,4 +330,40 @@ public final class WorkerUtils {
.startMessageId(startMessageId)
.create();
}
+
+ public static Producer<byte[]>
createExclusiveProducerWithRetry(PulsarClient client,
+ String
topic,
+ String
producerName,
+
Supplier<Boolean> isLeader,
+ int
sleepInBetweenMs) throws NotLeaderAnymore {
+ try {
+ int tries = 0;
+ do {
+ try {
+ return client.newProducer().topic(topic)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .enableBatching(false)
+ .blockIfQueueFull(true)
+ .compressionType(CompressionType.LZ4)
+ .producerName(producerName)
+ .createAsync().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.info("Encountered exception while at creating
exclusive producer to topic {}", topic, e);
+ }
+ tries++;
+ if (tries % 6 == 0) {
+ log.warn("Failed to acquire exclusive producer to topic {}
after {} attempts. Will retry if we are still the leader.", topic, tries);
+ }
+ Thread.sleep(sleepInBetweenMs);
+ } while (isLeader.get());
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to create exclusive producer on
topic " + topic, e);
+ }
+
+ throw new NotLeaderAnymore();
+ }
+
+ public static class NotLeaderAnymore extends Exception {
+
+ }
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 25e62f0..55f4222 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -26,6 +26,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.functions.proto.Function;
@@ -41,6 +43,11 @@ public class FunctionMetaDataManagerTest {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
when(builder.producerName(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.sendTimeout(anyInt(),
any(TimeUnit.class))).thenReturn(builder);
+ when(builder.accessMode(any())).thenReturn(builder);
Producer producer = mock(Producer.class);
TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
@@ -54,6 +61,7 @@ public class FunctionMetaDataManagerTest {
when(producer.newMessage()).thenReturn(messageBuilder);
when(builder.create()).thenReturn(producer);
+
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
PulsarClient client = mock(PulsarClient.class);
when(client.newProducer()).thenReturn(builder);
@@ -100,16 +108,16 @@ public class FunctionMetaDataManagerTest {
}
@Test
- public void testUpdateIfLeaderFunctionWithoutCompaction() throws
PulsarClientException {
+ public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception
{
testUpdateIfLeaderFunction(false);
}
@Test
- public void testUpdateIfLeaderFunctionWithCompaction() throws
PulsarClientException {
+ public void testUpdateIfLeaderFunctionWithCompaction() throws Exception {
testUpdateIfLeaderFunction(true);
}
- private void testUpdateIfLeaderFunction(boolean compact) throws
PulsarClientException {
+ private void testUpdateIfLeaderFunction(boolean compact) throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
@@ -131,7 +139,8 @@ public class FunctionMetaDataManagerTest {
}
// become leader
- functionMetaDataManager.acquireLeadership();
+ Producer<byte[]> exclusiveProducer =
functionMetaDataManager.acquireExclusiveWrite(() -> true);
+ functionMetaDataManager.acquireLeadership(exclusiveProducer);
// Now w should be able to really update
functionMetaDataManager.updateFunctionOnLeader(m1, false);
if (compact) {
@@ -158,16 +167,16 @@ public class FunctionMetaDataManagerTest {
}
@Test
- public void deregisterFunctionWithoutCompaction() throws
PulsarClientException {
+ public void deregisterFunctionWithoutCompaction() throws Exception {
deregisterFunction(false);
}
@Test
- public void deregisterFunctionWithCompaction() throws
PulsarClientException {
+ public void deregisterFunctionWithCompaction() throws Exception {
deregisterFunction(true);
}
- private void deregisterFunction(boolean compact) throws
PulsarClientException {
+ private void deregisterFunction(boolean compact) throws Exception {
SchedulerManager mockedScheduler = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
@@ -190,7 +199,8 @@ public class FunctionMetaDataManagerTest {
}
// become leader
- functionMetaDataManager.acquireLeadership();
+ Producer<byte[]> exclusiveProducer =
functionMetaDataManager.acquireExclusiveWrite(() -> true);
+ functionMetaDataManager.acquireLeadership(exclusiveProducer);
verify(mockedScheduler, times(0)).schedule();
// Now try deleting
functionMetaDataManager.updateFunctionOnLeader(m1, true);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 2bfaf81..57f2dca 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -61,6 +61,7 @@ public class LeaderServiceTest {
private CompletableFuture metadataManagerInitFuture;
private CompletableFuture runtimeManagerInitFuture;
private CompletableFuture readToTheEndAndExitFuture;
+ private MembershipManager membershipManager;
public LeaderServiceTest() {
this.workerConfig = new WorkerConfig();
@@ -117,8 +118,10 @@ public class LeaderServiceTest {
when(functionMetadataManager.getIsInitialized()).thenReturn(metadataManagerInitFuture);
when(functionRuntimeManager.getIsInitialized()).thenReturn(runtimeManagerInitFuture);
+ membershipManager = mock(MembershipManager.class);
+
leaderService = spy(new LeaderService(workerService, mockClient,
functionAssignmentTailer, schedulerManager,
- functionRuntimeManager, functionMetadataManager,
ErrorNotifier.getDefaultImpl()));
+ functionRuntimeManager, functionMetadataManager, membershipManager,
ErrorNotifier.getDefaultImpl()));
leaderService.start();
}
@@ -138,15 +141,19 @@ public class LeaderServiceTest {
verify(functionRuntimeManager, times(1)).getIsInitialized();
verify(runtimeManagerInitFuture, times(1)).get();
+ verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+ verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+
verify(functionAssignmentTailer,
times(1)).triggerReadToTheEndAndExit();
verify(functionAssignmentTailer, times(1)).close();
- verify(schedulerManager, times((1))).initialize();
+ verify(schedulerManager, times((1))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
verify(schedulerManager, times(1)).close();
+ verify(functionMetadataManager, times(1)).giveupLeadership();
}
@Test
@@ -159,15 +166,92 @@ public class LeaderServiceTest {
listenerHolder.get().becameActive(mockConsumer, 0);
assertTrue(leaderService.isLeader());
+ verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+ verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+
verify(functionAssignmentTailer,
times(1)).triggerReadToTheEndAndExit();
verify(readToTheEndAndExitFuture, times(1)).get();
verify(functionAssignmentTailer, times(1)).close();
- verify(schedulerManager, times((1))).initialize();
+ verify(schedulerManager, times((1))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(1)).start();
verify(schedulerManager, times(1)).close();
+ verify(functionMetadataManager, times(1)).giveupLeadership();
+ }
+
+ @Test
+ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore()
throws Exception {
+ MessageId messageId = new MessageIdImpl(1, 2, -1);
+ when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+ assertFalse(leaderService.isLeader());
+ verify(mockClient, times(1)).newConsumer();
+
+ // acquire exclusive producer failed because no leader anymore
+ when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new
WorkerUtils.NotLeaderAnymore());
+
+ listenerHolder.get().becameActive(mockConsumer, 0);
+ // should have failed to become leader
+ assertFalse(leaderService.isLeader());
+
+ verify(functionMetadataManager, times(1)).getIsInitialized();
+ verify(metadataManagerInitFuture, times(1)).get();
+ verify(functionRuntimeManager, times(1)).getIsInitialized();
+ verify(runtimeManagerInitFuture, times(1)).get();
+
+ verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+ verify(functionMetadataManager, times(0)).acquireExclusiveWrite(any());
+
+ verify(functionAssignmentTailer,
times(0)).triggerReadToTheEndAndExit();
+ verify(functionAssignmentTailer, times(0)).close();
+ verify(schedulerManager, times((0))).initialize(any());
+
+ listenerHolder.get().becameInactive(mockConsumer, 0);
+ assertFalse(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
+ verify(functionAssignmentTailer, times(0)).start();
+ verify(schedulerManager, times(0)).close();
+ verify(functionMetadataManager, times(0)).giveupLeadership();
}
+
+ @Test
+ public void
testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore() throws
Exception {
+ MessageId messageId = new MessageIdImpl(1, 2, -1);
+ when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
+
+ assertFalse(leaderService.isLeader());
+ verify(mockClient, times(1)).newConsumer();
+
+ // acquire exclusive producer failed because no leader anymore
+
when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new
WorkerUtils.NotLeaderAnymore());
+
+ listenerHolder.get().becameActive(mockConsumer, 0);
+ // should have failed to become leader
+ assertFalse(leaderService.isLeader());
+
+ verify(functionMetadataManager, times(1)).getIsInitialized();
+ verify(metadataManagerInitFuture, times(1)).get();
+ verify(functionRuntimeManager, times(1)).getIsInitialized();
+ verify(runtimeManagerInitFuture, times(1)).get();
+
+ verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
+ verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
+
+ verify(functionAssignmentTailer,
times(0)).triggerReadToTheEndAndExit();
+ verify(functionAssignmentTailer, times(0)).close();
+ verify(schedulerManager, times((0))).initialize(any());
+
+ listenerHolder.get().becameInactive(mockConsumer, 0);
+ assertFalse(leaderService.isLeader());
+
+ verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
+ verify(functionAssignmentTailer, times(0)).start();
+ verify(schedulerManager, times(0)).close();
+ verify(functionMetadataManager, times(0)).giveupLeadership();
+ }
+
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 2a6278e..19e27e7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
-import io.prometheus.client.CollectorRegistry;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedList;
@@ -61,7 +60,6 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.Mockito;
@@ -84,6 +82,7 @@ public class SchedulerManagerTest {
private ScheduledExecutorService executor;
private LeaderService leaderService;
private ErrorNotifier errorNotifier;
+ private PulsarClient pulsarClient;
@BeforeMethod
public void setup() {
@@ -116,10 +115,11 @@ public class SchedulerManagerTest {
when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
when(builder.sendTimeout(anyInt(),
any(TimeUnit.class))).thenReturn(builder);
+ when(builder.accessMode(any())).thenReturn(builder);
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ pulsarClient = mock(PulsarClient.class);
when(pulsarClient.newProducer()).thenReturn(builder);
this.executor = Executors
@@ -820,7 +820,7 @@ public class SchedulerManagerTest {
}
@Test
- public void testAssignmentWorkerDoesNotExist() throws
InterruptedException, NoSuchMethodException, TimeoutException,
ExecutionException {
+ public void testAssignmentWorkerDoesNotExist() throws Exception {
List<Function.FunctionMetaData> functionMetaDataList = new
LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder()
@@ -879,9 +879,12 @@ public class SchedulerManagerTest {
}
private void callSchedule() throws InterruptedException,
- TimeoutException, ExecutionException {
+ TimeoutException, ExecutionException, WorkerUtils.NotLeaderAnymore
{
- schedulerManager.initialize();
+ if (leaderService.isLeader()) {
+ Producer<byte[]> exclusiveProducer =
schedulerManager.acquireExclusiveWrite(() -> true);
+ schedulerManager.initialize(exclusiveProducer);
+ }
Future<?> complete = schedulerManager.schedule();
complete.get(30, TimeUnit.SECONDS);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
new file mode 100644
index 0000000..f667059
--- /dev/null
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class WorkerUtilsTest {
+
+ @Test
+ public void testCreateExclusiveProducerWithRetry() {
+ Producer<byte[]> producer = mock(Producer.class);
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.producerName(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.sendTimeout(anyInt(),
any(TimeUnit.class))).thenReturn(builder);
+ when(builder.accessMode(any())).thenReturn(builder);
+
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+ when(pulsarClient.newProducer()).thenReturn(builder);
+
+ Producer<byte[]> p = null;
+ try {
+ p = WorkerUtils.createExclusiveProducerWithRetry(pulsarClient,
"test-topic", "test-producer", () -> true, 0);
+ } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+ Assert.fail();
+ }
+ Assert.assertNotNull(p);
+ verify(builder, times(1)).topic(eq("test-topic"));
+ verify(builder, times(1)).producerName(eq("test-producer"));
+ verify(builder, times(1)).accessMode(eq(ProducerAccessMode.Exclusive));
+
+ CompletableFuture completableFuture = new CompletableFuture();
+ completableFuture.completeExceptionally(new
PulsarClientException.ProducerFencedException("test"));
+ when(builder.createAsync()).thenReturn(completableFuture);
+ try {
+ WorkerUtils.createExclusiveProducerWithRetry(pulsarClient,
"test-topic", "test-producer", () -> false, 0);
+ Assert.fail();
+ } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+
+ }
+
+ AtomicInteger i = new AtomicInteger();
+ try {
+ WorkerUtils.createExclusiveProducerWithRetry(pulsarClient,
"test-topic", "test-producer", new Supplier<Boolean>() {
+
+ @Override
+ public Boolean get() {
+ if (i.getAndIncrement() < 6) {
+ return true;
+ }
+ return false;
+ }
+ }, 0);
+ Assert.fail();
+ } catch (WorkerUtils.NotLeaderAnymore notLeaderAnymore) {
+
+ }
+ }
+}
\ No newline at end of file