rdhabalia closed pull request #2437: Race condition function-runtime-manager
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e481..a6fda6f8b6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public String getSubscription() {
return subscription;
}
+ public String getConsumerName() {
+ return this.consumerName;
+ }
+
/**
* Redelivers the given unacknowledged messages. In Failover mode, the
request is ignored if the consumer is not
* active for the given topic. In Shared mode, the consumers messages to
be redelivered are distributed across all
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 7994ef33d4..b18fd12881 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
@@ -53,7 +54,7 @@
public class MembershipManager implements AutoCloseable, ConsumerEventListener
{
private final String consumerName;
- private final Consumer<byte[]> consumer;
+ private final ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdminClient;
private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -68,9 +69,9 @@
@VisibleForTesting
Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
- MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+ MembershipManager(WorkerService service, PulsarClient client)
throws PulsarClientException {
- this.workerConfig = workerConfig;
+ this.workerConfig = service.getWorkerConfig();
consumerName = String.format(
"%s:%s:%d",
workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@
// we don't produce any messages into this topic, we only use the
`failover` subscription
// to elect an active consumer as the leader worker. The leader worker
will be responsible
// for scheduling snapshots for FMT and doing task assignment.
- consumer = client.newConsumer()
+ consumer = (ConsumerImpl<byte[]>) client.newConsumer()
.topic(workerConfig.getClusterCoordinationTopic())
.subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
.subscriptionType(SubscriptionType.Failover)
.consumerEventListener(this)
.property(WORKER_IDENTIFIER, consumerName)
.subscribe();
+
+ isLeader.set(checkLeader(service, consumer.getConsumerName()));
}
@Override
@@ -282,4 +285,19 @@ private PulsarAdmin getPulsarAdminClient() {
return this.pulsarAdminClient;
}
+ private boolean checkLeader(WorkerService service, String consumerName) {
+ try {
+ TopicStats stats = service.getBrokerAdmin().topics()
+
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+ String activeConsumerName = stats != null
+ &&
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+ ?
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+ : null;
+ return consumerName != null &&
consumerName.equalsIgnoreCase(activeConsumerName);
+ } catch (Exception e) {
+ log.warn("Failed to check leader {}", e.getMessage());
+ }
+ return false;
+ }
+
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766e8d..7fc0cc94d7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -137,7 +137,7 @@ public void start(URI dlogUri) throws InterruptedException {
this.connectorsManager = new ConnectorsManager(workerConfig);
//create membership manager
- this.membershipManager = new MembershipManager(this.workerConfig,
this.client);
+ this.membershipManager = new MembershipManager(this, this.client);
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7ed6acad86..2753bf196a 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.proto.Function;
import org.mockito.ArgumentMatcher;
@@ -68,7 +69,7 @@ public MembershipManagerTest() {
public void testConsumerEventListener() throws Exception {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
- Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -77,6 +78,8 @@ public void testConsumerEventListener() throws Exception {
when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
AtomicReference<ConsumerEventListener> listenerHolder = new
AtomicReference<>();
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
-> {
@@ -89,7 +92,7 @@ public void testConsumerEventListener() throws Exception {
when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockClient));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockClient));
assertFalse(membershipManager.isLeader());
verify(mockClient, times(1))
.newConsumer();
@@ -104,7 +107,7 @@ public void testConsumerEventListener() throws Exception {
private static PulsarClient mockPulsarClient() throws
PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
- Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -124,7 +127,7 @@ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
@Test
public void testCheckFailuresNoFailures() throws Exception {
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -132,6 +135,7 @@ public void testCheckFailuresNoFailures() throws Exception {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -142,7 +146,7 @@ public void testCheckFailuresNoFailures() throws Exception {
mock(ConnectorsManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, pulsarClient));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -193,7 +197,7 @@ public void testCheckFailuresNoFailures() throws Exception {
public void testCheckFailuresSomeFailures() throws Exception {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -201,6 +205,7 @@ public void testCheckFailuresSomeFailures() throws
Exception {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -212,7 +217,7 @@ public void testCheckFailuresSomeFailures() throws
Exception {
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -287,7 +292,7 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -295,6 +300,7 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -305,7 +311,7 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
mock(ConnectorsManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services