This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 672a167 Race condition function-runtime-manager read old assignments (#2437) 672a167 is described below commit 672a167a31c719dab7623770d39666e8d7e7a0fd Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Aug 23 18:34:44 2018 -0700 Race condition function-runtime-manager read old assignments (#2437) --- .../apache/pulsar/client/impl/ConsumerBase.java | 4 ++++ .../pulsar/functions/worker/MembershipManager.java | 26 ++++++++++++++++++---- .../pulsar/functions/worker/WorkerService.java | 2 +- .../functions/worker/MembershipManagerTest.java | 24 ++++++++++++-------- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 016324e..a6fda6f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -335,6 +335,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return subscription; } + public String getConsumerName() { + return this.consumerName; + } + /** * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index 7994ef3..b18fd12 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.proto.Function; @@ -53,7 +54,7 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils; public class MembershipManager implements AutoCloseable, ConsumerEventListener { private final String consumerName; - private final Consumer<byte[]> consumer; + private final ConsumerImpl<byte[]> consumer; private final WorkerConfig workerConfig; private PulsarAdmin pulsarAdminClient; private final CompletableFuture<Void> firstConsumerEventFuture; @@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { @VisibleForTesting Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>(); - MembershipManager(WorkerConfig workerConfig, PulsarClient client) + MembershipManager(WorkerService service, PulsarClient client) throws PulsarClientException { - this.workerConfig = workerConfig; + this.workerConfig = service.getWorkerConfig(); consumerName = String.format( "%s:%s:%d", workerConfig.getWorkerId(), @@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { // we don't produce any messages into this topic, we only use the `failover` subscription // to elect an active consumer as the leader worker. The leader worker will be responsible // for scheduling snapshots for FMT and doing task assignment. - consumer = client.newConsumer() + consumer = (ConsumerImpl<byte[]>) client.newConsumer() .topic(workerConfig.getClusterCoordinationTopic()) .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) .subscriptionType(SubscriptionType.Failover) .consumerEventListener(this) .property(WORKER_IDENTIFIER, consumerName) .subscribe(); + + isLeader.set(checkLeader(service, consumer.getConsumerName())); } @Override @@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { return this.pulsarAdminClient; } + private boolean checkLeader(WorkerService service, String consumerName) { + try { + TopicStats stats = service.getBrokerAdmin().topics() + .getStats(service.getWorkerConfig().getClusterCoordinationTopic()); + String activeConsumerName = stats != null + && stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null + ? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName + : null; + return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName); + } catch (Exception e) { + log.warn("Failed to check leader {}", e.getMessage()); + } + return false; + } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0850766..7fc0cc9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -137,7 +137,7 @@ public class WorkerService { this.connectorsManager = new ConnectorsManager(workerConfig); //create membership manager - this.membershipManager = new MembershipManager(this.workerConfig, this.client); + this.membershipManager = new MembershipManager(this, this.client); // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 7ed6aca..2753bf1 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.proto.Function; import org.mockito.ArgumentMatcher; @@ -68,7 +69,7 @@ public class MembershipManagerTest { public void testConsumerEventListener() throws Exception { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); - Consumer<byte[]> mockConsumer = mock(Consumer.class); + ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class); ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); @@ -77,6 +78,8 @@ public class MembershipManagerTest { when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); + WorkerService workerService = mock(WorkerService.class); + doReturn(workerConfig).when(workerService).getWorkerConfig(); AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>(); when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> { @@ -89,7 +92,7 @@ public class MembershipManagerTest { when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); - MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockClient)); + MembershipManager membershipManager = spy(new MembershipManager(workerService, mockClient)); assertFalse(membershipManager.isLeader()); verify(mockClient, times(1)) .newConsumer(); @@ -104,7 +107,7 @@ public class MembershipManagerTest { private static PulsarClient mockPulsarClient() throws PulsarClientException { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); - Consumer<byte[]> mockConsumer = mock(Consumer.class); + ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class); ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); @@ -124,7 +127,7 @@ public class MembershipManagerTest { @Test public void testCheckFailuresNoFailures() throws Exception { SchedulerManager schedulerManager = mock(SchedulerManager.class); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -132,6 +135,7 @@ public class MembershipManagerTest { doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); + doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( @@ -142,7 +146,7 @@ public class MembershipManagerTest { mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); - MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); + MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient)); List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); @@ -193,7 +197,7 @@ public class MembershipManagerTest { public void testCheckFailuresSomeFailures() throws Exception { workerConfig.setRescheduleTimeoutMs(30000); SchedulerManager schedulerManager = mock(SchedulerManager.class); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -201,6 +205,7 @@ public class MembershipManagerTest { doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); + doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( @@ -212,7 +217,7 @@ public class MembershipManagerTest { )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); - MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); + MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); @@ -287,7 +292,7 @@ public class MembershipManagerTest { workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setRescheduleTimeoutMs(30000); SchedulerManager schedulerManager = mock(SchedulerManager.class); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -295,6 +300,7 @@ public class MembershipManagerTest { doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); + doReturn(workerConfig).when(workerService).getWorkerConfig(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( @@ -305,7 +311,7 @@ public class MembershipManagerTest { mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); - MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); + MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));