rdhabalia closed pull request #2437: Race condition function-runtime-manager 
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e481..a6fda6f8b6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public String getSubscription() {
         return subscription;
     }
 
+    public String getConsumerName() {
+        return this.consumerName;
+    }
+
     /**
      * Redelivers the given unacknowledged messages. In Failover mode, the 
request is ignored if the consumer is not
      * active for the given topic. In Shared mode, the consumers messages to 
be redelivered are distributed across all
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 7994ef33d4..b18fd12881 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -53,7 +54,7 @@
 public class MembershipManager implements AutoCloseable, ConsumerEventListener 
{
 
     private final String consumerName;
-    private final Consumer<byte[]> consumer;
+    private final ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private PulsarAdmin pulsarAdminClient;
     private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -68,9 +69,9 @@
     @VisibleForTesting
     Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
 
-    MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+    MembershipManager(WorkerService service, PulsarClient client)
             throws PulsarClientException {
-        this.workerConfig = workerConfig;
+        this.workerConfig = service.getWorkerConfig();
         consumerName = String.format(
             "%s:%s:%d",
             workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@
         // we don't produce any messages into this topic, we only use the 
`failover` subscription
         // to elect an active consumer as the leader worker. The leader worker 
will be responsible
         // for scheduling snapshots for FMT and doing task assignment.
-        consumer = client.newConsumer()
+        consumer = (ConsumerImpl<byte[]>) client.newConsumer()
                 .topic(workerConfig.getClusterCoordinationTopic())
                 .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
                 .subscriptionType(SubscriptionType.Failover)
                 .consumerEventListener(this)
                 .property(WORKER_IDENTIFIER, consumerName)
                 .subscribe();
+        
+        isLeader.set(checkLeader(service, consumer.getConsumerName()));
     }
 
     @Override
@@ -282,4 +285,19 @@ private PulsarAdmin getPulsarAdminClient() {
         return this.pulsarAdminClient;
     }
 
+    private boolean checkLeader(WorkerService service, String consumerName) {
+        try {
+            TopicStats stats = service.getBrokerAdmin().topics()
+                    
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+            String activeConsumerName = stats != null
+                    && 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+                            ? 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+                            : null;
+            return consumerName != null && 
consumerName.equalsIgnoreCase(activeConsumerName);
+        } catch (Exception e) {
+            log.warn("Failed to check leader {}", e.getMessage());
+        }
+        return false;
+    }
+    
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766e8d..7fc0cc94d7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -137,7 +137,7 @@ public void start(URI dlogUri) throws InterruptedException {
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
             //create membership manager
-            this.membershipManager = new MembershipManager(this.workerConfig, 
this.client);
+            this.membershipManager = new MembershipManager(this, this.client);
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7ed6acad86..2753bf196a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.mockito.ArgumentMatcher;
@@ -68,7 +69,7 @@ public MembershipManagerTest() {
     public void testConsumerEventListener() throws Exception {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
 
-        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
         
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -77,6 +78,8 @@ public void testConsumerEventListener() throws Exception {
         when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
 
         when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
 
         AtomicReference<ConsumerEventListener> listenerHolder = new 
AtomicReference<>();
         
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
 -> {
@@ -89,7 +92,7 @@ public void testConsumerEventListener() throws Exception {
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
 
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockClient));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockClient));
         assertFalse(membershipManager.isLeader());
         verify(mockClient, times(1))
             .newConsumer();
@@ -104,7 +107,7 @@ public void testConsumerEventListener() throws Exception {
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
 
-        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
         
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -124,7 +127,7 @@ private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
     @Test
     public void testCheckFailuresNoFailures() throws Exception {
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -132,6 +135,7 @@ public void testCheckFailuresNoFailures() throws Exception {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -142,7 +146,7 @@ public void testCheckFailuresNoFailures() throws Exception {
                 mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -193,7 +197,7 @@ public void testCheckFailuresNoFailures() throws Exception {
     public void testCheckFailuresSomeFailures() throws Exception {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -201,6 +205,7 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -212,7 +217,7 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -287,7 +292,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -295,6 +300,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -305,7 +311,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
                 mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to