This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 672a167 Race condition function-runtime-manager read old assignments
(#2437)
672a167 is described below
commit 672a167a31c719dab7623770d39666e8d7e7a0fd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 23 18:34:44 2018 -0700
Race condition function-runtime-manager read old assignments (#2437)
---
.../apache/pulsar/client/impl/ConsumerBase.java | 4 ++++
.../pulsar/functions/worker/MembershipManager.java | 26 ++++++++++++++++++----
.../pulsar/functions/worker/WorkerService.java | 2 +-
.../functions/worker/MembershipManagerTest.java | 24 ++++++++++++--------
4 files changed, 42 insertions(+), 14 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e..a6fda6f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return subscription;
}
+ public String getConsumerName() {
+ return this.consumerName;
+ }
+
/**
* Redelivers the given unacknowledged messages. In Failover mode, the
request is ignored if the consumer is not
* active for the given topic. In Shared mode, the consumers messages to
be redelivered are distributed across all
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 7994ef3..b18fd12 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
@@ -53,7 +54,7 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
public class MembershipManager implements AutoCloseable, ConsumerEventListener
{
private final String consumerName;
- private final Consumer<byte[]> consumer;
+ private final ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdminClient;
private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
@VisibleForTesting
Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
- MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+ MembershipManager(WorkerService service, PulsarClient client)
throws PulsarClientException {
- this.workerConfig = workerConfig;
+ this.workerConfig = service.getWorkerConfig();
consumerName = String.format(
"%s:%s:%d",
workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
// we don't produce any messages into this topic, we only use the
`failover` subscription
// to elect an active consumer as the leader worker. The leader worker
will be responsible
// for scheduling snapshots for FMT and doing task assignment.
- consumer = client.newConsumer()
+ consumer = (ConsumerImpl<byte[]>) client.newConsumer()
.topic(workerConfig.getClusterCoordinationTopic())
.subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
.subscriptionType(SubscriptionType.Failover)
.consumerEventListener(this)
.property(WORKER_IDENTIFIER, consumerName)
.subscribe();
+
+ isLeader.set(checkLeader(service, consumer.getConsumerName()));
}
@Override
@@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
return this.pulsarAdminClient;
}
+ private boolean checkLeader(WorkerService service, String consumerName) {
+ try {
+ TopicStats stats = service.getBrokerAdmin().topics()
+
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+ String activeConsumerName = stats != null
+ &&
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+ ?
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+ : null;
+ return consumerName != null &&
consumerName.equalsIgnoreCase(activeConsumerName);
+ } catch (Exception e) {
+ log.warn("Failed to check leader {}", e.getMessage());
+ }
+ return false;
+ }
+
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766..7fc0cc9 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -137,7 +137,7 @@ public class WorkerService {
this.connectorsManager = new ConnectorsManager(workerConfig);
//create membership manager
- this.membershipManager = new MembershipManager(this.workerConfig,
this.client);
+ this.membershipManager = new MembershipManager(this, this.client);
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7ed6aca..2753bf1 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.proto.Function;
import org.mockito.ArgumentMatcher;
@@ -68,7 +69,7 @@ public class MembershipManagerTest {
public void testConsumerEventListener() throws Exception {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
- Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -77,6 +78,8 @@ public class MembershipManagerTest {
when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
AtomicReference<ConsumerEventListener> listenerHolder = new
AtomicReference<>();
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
-> {
@@ -89,7 +92,7 @@ public class MembershipManagerTest {
when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockClient));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockClient));
assertFalse(membershipManager.isLeader());
verify(mockClient, times(1))
.newConsumer();
@@ -104,7 +107,7 @@ public class MembershipManagerTest {
private static PulsarClient mockPulsarClient() throws
PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
- Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -124,7 +127,7 @@ public class MembershipManagerTest {
@Test
public void testCheckFailuresNoFailures() throws Exception {
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -132,6 +135,7 @@ public class MembershipManagerTest {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -142,7 +146,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, pulsarClient));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -193,7 +197,7 @@ public class MembershipManagerTest {
public void testCheckFailuresSomeFailures() throws Exception {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -201,6 +205,7 @@ public class MembershipManagerTest {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -212,7 +217,7 @@ public class MembershipManagerTest {
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -287,7 +292,7 @@ public class MembershipManagerTest {
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
+ PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -295,6 +300,7 @@ public class MembershipManagerTest {
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
+ doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -305,7 +311,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));