This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e5b3dd Revert dup consumer and related code (#4142)
1e5b3dd is described below
commit 1e5b3dd98eba136a68a99bf8106d3258cb1f283d
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Mon Apr 29 15:06:14 2019 -0300
Revert dup consumer and related code (#4142)
* Revert "[cpp client] implement reference count for close() (#3863)"
This reverts commit ee98e8b28ce1397c056757fe73115d21320ed883.
* Revert Prevent dup consumers and refCount for cpp client
Revert "[cpp client] Bugfix prevent dup consumer for same topic
subscription"
Revert "[Issue #3226][cpp client] Prevent dup consumers on same client cnx"
This reverts commit fff02e2aa2064412dbae18b973eb2bb2abab25d8.
This reverts commit 762e0ab9a52e2665a106766c211d45474adc833b.
* Revert "Feature - implement reference count for ConsumerImpl (#3795)"
This reverts commit ff4db8db12be2eb79d910e2b286306298f71320e.
* Revert Prevent dup consumers and refCount for java client
Revert "Prevent dup consumers on same client cnx with shared subscription"
Revert "[java client] Bugfix prevent dup consumers for same topic subscribe"
This reverts commit 231db030b9529737237721059b2a5b3044d4cab1.
This reverts commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111.
---
.../client/api/SimpleProducerConsumerTest.java | 112 ---------------------
pulsar-client-cpp/lib/ClientImpl.cc | 13 ---
pulsar-client-cpp/lib/ConsumerImpl.cc | 12 ---
pulsar-client-cpp/lib/ConsumerImpl.h | 3 -
pulsar-client-cpp/lib/ConsumerImplBase.h | 1 -
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 82 ---------------
.../apache/pulsar/client/impl/ConsumerBase.java | 9 --
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 -
.../pulsar/client/impl/PulsarClientImpl.java | 19 ----
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 7 +-
10 files changed, 1 insertion(+), 261 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index af9be10..74dd59d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2998,116 +2998,4 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer5.close();
log.info("-- Exiting {} test --", methodName);
}
-
- // Issue 3226: https://github.com/apache/pulsar/issues/3226
- // Pull 3312: https://github.com/apache/pulsar/pull/3312
- // Bugfix preventing duplicated consumers on same client cnx with shared
subscription mode
- @Test()
- public void testPreventDupConsumersOnClientCnxForSingleSub() throws
Exception {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
- if (t1 != null) {
- future.completeExceptionally(t1);
- return;
- }
-
- consumer.closeAsync().whenComplete((aVoid2, t2) -> {
- if (t2 != null) {
- future.completeExceptionally(t2);
- return;
- }
- future.complete(null);
- });
- });
-
- future.get(5, TimeUnit.SECONDS);
- Assert.assertEquals(consumer, consumerB);
- Assert.assertTrue(future.isDone());
- Assert.assertFalse(future.isCompletedExceptionally());
- }
-
- @Test()
- public void
testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws
Exception {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- // This consumer should be a newly subscription since is it from a
different topic
- // even though has the same subscription name.
- Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic +
"-different-topic")
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
- if (t1 != null) {
- future.completeExceptionally(t1);
- return;
- }
-
- consumer.closeAsync().whenComplete((aVoid2, t2) -> {
- if (t2 != null) {
- future.completeExceptionally(t2);
- return;
- }
- future.complete(null);
- });
- });
-
- future.get(5, TimeUnit.SECONDS);
- Assert.assertEquals(consumer, consumerB);
- Assert.assertTrue(future.isDone());
- Assert.assertFalse(future.isCompletedExceptionally());
-
- // consumerC is a newly created subscription.
- Assert.assertNotEquals(consumer, consumerC);
- Assert.assertTrue(consumerC.isConnected());
- consumerC.close();
- }
-
- @Test
- public void testRefCount_OnCloseConsumer() throws Exception {
- final String topic = "persistent://my-property/my-ns/my-topic";
- final String subName = "my-subscription";
-
- Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
- .subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- Assert.assertEquals(consumerA, consumerB);
-
- consumerA.close();
- Assert.assertTrue(consumerA.isConnected());
- Assert.assertTrue(consumerB.isConnected());
-
- consumerB.close();
- Assert.assertFalse(consumerA.isConnected());
- Assert.assertFalse(consumerB.isConnected());
- }
}
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index e2c15e0..0e996cc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -340,19 +340,6 @@ void ClientImpl::subscribeAsync(const std::string& topic,
const std::string& con
lock.unlock();
callback(ResultInvalidConfiguration, Consumer());
return;
- } else if (conf.getConsumerType() == ConsumerShared) {
- ConsumersList consumers(consumers_);
- for (auto& weakPtr : consumers) {
- ConsumerImplBasePtr consumer = weakPtr.lock();
- if (consumer && consumer->getSubscriptionName() ==
consumerName &&
- consumer->getTopic() == topic && !consumer->isClosed()) {
- consumer->incrRefCount();
- lock.unlock();
- LOG_INFO("Reusing existing consumer instance for " <<
topic << " -- " << consumerName);
- callback(ResultOk, Consumer(consumer));
- return;
- }
- }
}
}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index d9fedaf..2cc5a3b 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -110,10 +110,6 @@ Future<Result, ConsumerImplBaseWeakPtr>
ConsumerImpl::getConsumerCreatedFuture()
return consumerCreatedPromise_.getFuture();
}
-void ConsumerImpl::incrRefCount() { ++refCount_; }
-
-unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ?
refCount_-- : refCount_; }
-
const std::string& ConsumerImpl::getSubscriptionName() const { return
originalSubscriptionName_; }
const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -821,14 +817,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- if (safeDecrRefCount() != 0) {
- lock.unlock();
- if (callback) {
- callback(ResultOk);
- }
- return;
- }
-
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 16ce091..b917f790 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -113,7 +113,6 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback
callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback
callback);
- virtual void incrRefCount();
protected:
void connectionOpened(const ClientConnectionPtr& cnx);
@@ -147,7 +146,6 @@ class ConsumerImpl : public ConsumerImplBase,
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
void notifyPendingReceivedCallback(Result result, Message& message, const
ReceiveCallback& callback);
void failPendingReceiveCallback();
- unsigned int safeDecrRefCount();
Optional<MessageId> clearReceiveQueue();
@@ -178,7 +176,6 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
- unsigned int refCount_ = 0;
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h
b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 78b5e12..c716270 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -53,7 +53,6 @@ class ConsumerImplBase {
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) =
0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
- virtual void incrRefCount(){};
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 4f4f50d..2f82a15 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2796,56 +2796,6 @@ TEST(BasicEndToEndTest,
testPartitionedReceiveAsyncFailedConsumer) {
client.shutdown();
}
-TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName =
"persistent://public/default/test-prevent-dup-consumers";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf,
consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf,
consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- // Since this is a shared consumer over same client cnx
- // closing consumerA should result in consumerB also being closed.
- ASSERT_EQ(ResultOk, consumerA.close());
- ASSERT_EQ(ResultOk, consumerB.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-}
-
-TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe)
{
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName =
-
"persistent://public/default/testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf,
consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf,
consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- ASSERT_EQ(ResultOk, consumerA.unsubscribe());
- // If dup consumers are allowed BrokerMetadataError will be the result of
close()
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-}
-
void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
Client client(lookupUrl);
Consumer consumer;
@@ -2913,38 +2863,6 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
testNegativeAcks(topicName, true);
}
-TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics)
{
- ClientConfiguration config;
- Client client(lookupUrl);
- std::string subsName = "my-only-sub";
- std::string topicName =
-
"persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
- ConsumerConfiguration consumerConf;
- consumerConf.setConsumerType(ConsumerShared);
-
- Consumer consumerA;
- Result resultA = client.subscribe(topicName, subsName, consumerConf,
consumerA);
- ASSERT_EQ(ResultOk, resultA);
- ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
- Consumer consumerB;
- Result resultB = client.subscribe(topicName, subsName, consumerConf,
consumerB);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
- Consumer consumerC;
- Result resultC = client.subscribe(topicName + "-different-topic",
subsName, consumerConf, consumerC);
- ASSERT_EQ(ResultOk, resultB);
- ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
- ASSERT_EQ(ResultOk, consumerA.close());
- ASSERT_EQ(ResultOk, consumerB.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
- ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-
- // consumer C should be a different instance from A and B and should be
with open state.
- ASSERT_EQ(ResultOk, consumerC.close());
-}
-
static long regexTestMessagesReceived = 0;
static void regexMessageListenerFunction(Consumer consumer, const Message
&msg) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 350ed9c..07cf3cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -61,7 +61,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected int maxReceiverQueueSize;
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
- private int refCount = 0;
protected ConsumerBase(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService
listenerExecutor,
@@ -394,12 +393,4 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
interceptors.onNegativeAcksSend(this, messageIds);
}
}
-
- protected synchronized void incrRefCount() {
- ++refCount;
- }
-
- protected synchronized boolean shouldTearDown() {
- return refCount > 0 ? refCount-- == 0 : refCount == 0;
- }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 41eaa6d..8909989 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -647,10 +647,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
public CompletableFuture<Void> closeAsync() {
- if (!shouldTearDown()) {
- return CompletableFuture.completedFuture(null);
- }
-
if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3da7e14..73a4a3c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -315,11 +315,6 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>>
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T>
schema, ConsumerInterceptors<T> interceptors) {
- Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
- if (subscriber.isPresent()) {
- return CompletableFuture.completedFuture(subscriber.get());
- }
-
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new
CompletableFuture<>();
String topic = conf.getSingleTopic();
@@ -667,20 +662,6 @@ public class PulsarClientImpl implements PulsarClient {
});
}
- @SuppressWarnings("unchecked")
- private <T> Optional<ConsumerBase<T>>
subscriptionExist(ConsumerConfigurationData<?> conf) {
- synchronized (consumers) {
- Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
- .filter(c ->
c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
- .filter(c -> conf.getTopicNames().contains(c.getTopic()))
- .filter(c ->
c.getSubscription().equals(conf.getSubscriptionName()))
- .filter(Consumer::isConnected)
- .findFirst();
- subscriber.ifPresent(ConsumerBase::incrRefCount);
- return subscriber.map(ConsumerBase.class::cast);
- }
- }
-
private static EventLoopGroup getEventLoopGroup(ClientConfigurationData
conf) {
ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(),
threadFactory);
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index d8aa539..b13cefd 100644
---
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -301,17 +301,12 @@ public class PulsarSpoutTest extends ProducerConsumerBase
{
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.topics().getStats(topic);
-
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
1);
+
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
2);
otherSpout.close();
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
1);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
-
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
0);
}
@Test