This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e5b3dd  Revert dup consumer and related code (#4142)
1e5b3dd is described below

commit 1e5b3dd98eba136a68a99bf8106d3258cb1f283d
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Mon Apr 29 15:06:14 2019 -0300

    Revert dup consumer and related code (#4142)
    
    * Revert "[cpp client] implement reference count for close() (#3863)"
    
    This reverts commit ee98e8b28ce1397c056757fe73115d21320ed883.
    
    * Revert Prevent dup consumers and refCount for cpp client
    
    Revert "[cpp client] Bugfix prevent dup consumer for same topic 
subscription"
    Revert "[Issue #3226][cpp client] Prevent dup consumers on same client cnx"
    
    This reverts commit fff02e2aa2064412dbae18b973eb2bb2abab25d8.
    This reverts commit 762e0ab9a52e2665a106766c211d45474adc833b.
    
    * Revert "Feature - implement reference count for ConsumerImpl (#3795)"
    
    This reverts commit ff4db8db12be2eb79d910e2b286306298f71320e.
    
    * Revert Prevent dup consumers and refCount for java client
    
    Revert "Prevent dup consumers on same client cnx with shared subscription"
    Revert "[java client] Bugfix prevent dup consumers for same topic subscribe"
    
    This reverts commit 231db030b9529737237721059b2a5b3044d4cab1.
    This reverts commit fb5dcd9a58524686f2f6208d41a1e82b5bbb8111.
---
 .../client/api/SimpleProducerConsumerTest.java     | 112 ---------------------
 pulsar-client-cpp/lib/ClientImpl.cc                |  13 ---
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  12 ---
 pulsar-client-cpp/lib/ConsumerImpl.h               |   3 -
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 -
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  82 ---------------
 .../apache/pulsar/client/impl/ConsumerBase.java    |   9 --
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 -
 .../pulsar/client/impl/PulsarClientImpl.java       |  19 ----
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |   7 +-
 10 files changed, 1 insertion(+), 261 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index af9be10..74dd59d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2998,116 +2998,4 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer5.close();
         log.info("-- Exiting {} test --", methodName);
     }
-
-    // Issue 3226: https://github.com/apache/pulsar/issues/3226
-    // Pull 3312: https://github.com/apache/pulsar/pull/3312
-    // Bugfix preventing duplicated consumers on same client cnx with shared 
subscription mode
-    @Test()
-    public void testPreventDupConsumersOnClientCnxForSingleSub() throws 
Exception {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
-            if (t1 != null) {
-                future.completeExceptionally(t1);
-                return;
-            }
-
-            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
-                if (t2 != null) {
-                    future.completeExceptionally(t2);
-                    return;
-                }
-                future.complete(null);
-            });
-        });
-
-        future.get(5, TimeUnit.SECONDS);
-        Assert.assertEquals(consumer, consumerB);
-        Assert.assertTrue(future.isDone());
-        Assert.assertFalse(future.isCompletedExceptionally());
-    }
-
-    @Test()
-    public void 
testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws 
Exception {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        // This consumer should be a newly subscription since is it from a 
different topic
-        // even though has the same subscription name.
-        Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + 
"-different-topic")
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
-            if (t1 != null) {
-                future.completeExceptionally(t1);
-                return;
-            }
-
-            consumer.closeAsync().whenComplete((aVoid2, t2) -> {
-                if (t2 != null) {
-                    future.completeExceptionally(t2);
-                    return;
-                }
-                future.complete(null);
-            });
-        });
-
-        future.get(5, TimeUnit.SECONDS);
-        Assert.assertEquals(consumer, consumerB);
-        Assert.assertTrue(future.isDone());
-        Assert.assertFalse(future.isCompletedExceptionally());
-
-        // consumerC is a newly created subscription.
-        Assert.assertNotEquals(consumer, consumerC);
-        Assert.assertTrue(consumerC.isConnected());
-        consumerC.close();
-    }
-
-    @Test
-    public void testRefCount_OnCloseConsumer() throws Exception {
-        final String topic = "persistent://my-property/my-ns/my-topic";
-        final String subName = "my-subscription";
-
-        Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-        Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        Assert.assertEquals(consumerA, consumerB);
-
-        consumerA.close();
-        Assert.assertTrue(consumerA.isConnected());
-        Assert.assertTrue(consumerB.isConnected());
-
-        consumerB.close();
-        Assert.assertFalse(consumerA.isConnected());
-        Assert.assertFalse(consumerB.isConnected());
-    }
 }
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index e2c15e0..0e996cc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -340,19 +340,6 @@ void ClientImpl::subscribeAsync(const std::string& topic, 
const std::string& con
             lock.unlock();
             callback(ResultInvalidConfiguration, Consumer());
             return;
-        } else if (conf.getConsumerType() == ConsumerShared) {
-            ConsumersList consumers(consumers_);
-            for (auto& weakPtr : consumers) {
-                ConsumerImplBasePtr consumer = weakPtr.lock();
-                if (consumer && consumer->getSubscriptionName() == 
consumerName &&
-                    consumer->getTopic() == topic && !consumer->isClosed()) {
-                    consumer->incrRefCount();
-                    lock.unlock();
-                    LOG_INFO("Reusing existing consumer instance for " << 
topic << " -- " << consumerName);
-                    callback(ResultOk, Consumer(consumer));
-                    return;
-                }
-            }
         }
     }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index d9fedaf..2cc5a3b 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -110,10 +110,6 @@ Future<Result, ConsumerImplBaseWeakPtr> 
ConsumerImpl::getConsumerCreatedFuture()
     return consumerCreatedPromise_.getFuture();
 }
 
-void ConsumerImpl::incrRefCount() { ++refCount_; }
-
-unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? 
refCount_-- : refCount_; }
-
 const std::string& ConsumerImpl::getSubscriptionName() const { return 
originalSubscriptionName_; }
 
 const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -821,14 +817,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    if (safeDecrRefCount() != 0) {
-        lock.unlock();
-        if (callback) {
-            callback(ResultOk);
-        }
-        return;
-    }
-
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 16ce091..b917f790 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -113,7 +113,6 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback 
callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback 
callback);
-    virtual void incrRefCount();
 
    protected:
     void connectionOpened(const ClientConnectionPtr& cnx);
@@ -147,7 +146,6 @@ class ConsumerImpl : public ConsumerImplBase,
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
     void notifyPendingReceivedCallback(Result result, Message& message, const 
ReceiveCallback& callback);
     void failPendingReceiveCallback();
-    unsigned int safeDecrRefCount();
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -178,7 +176,6 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     NegativeAcksTracker negativeAcksTracker_;
-    unsigned int refCount_ = 0;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h 
b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 78b5e12..c716270 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -53,7 +53,6 @@ class ConsumerImplBase {
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 
0;
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
-    virtual void incrRefCount(){};
 };
 }  // namespace pulsar
 #endif  // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 4f4f50d..2f82a15 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2796,56 +2796,6 @@ TEST(BasicEndToEndTest, 
testPartitionedReceiveAsyncFailedConsumer) {
     client.shutdown();
 }
 
-TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName = 
"persistent://public/default/test-prevent-dup-consumers";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, 
consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, 
consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    // Since this is a shared consumer over same client cnx
-    // closing consumerA should result in consumerB also being closed.
-    ASSERT_EQ(ResultOk, consumerA.close());
-    ASSERT_EQ(ResultOk, consumerB.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-}
-
-TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe) 
{
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName =
-        
"persistent://public/default/testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, 
consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, 
consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    ASSERT_EQ(ResultOk, consumerA.unsubscribe());
-    // If dup consumers are allowed BrokerMetadataError will be the result of 
close()
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-}
-
 void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
     Client client(lookupUrl);
     Consumer consumer;
@@ -2913,38 +2863,6 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
     testNegativeAcks(topicName, true);
 }
 
-TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) 
{
-    ClientConfiguration config;
-    Client client(lookupUrl);
-    std::string subsName = "my-only-sub";
-    std::string topicName =
-        
"persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
-    ConsumerConfiguration consumerConf;
-    consumerConf.setConsumerType(ConsumerShared);
-
-    Consumer consumerA;
-    Result resultA = client.subscribe(topicName, subsName, consumerConf, 
consumerA);
-    ASSERT_EQ(ResultOk, resultA);
-    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
-
-    Consumer consumerB;
-    Result resultB = client.subscribe(topicName, subsName, consumerConf, 
consumerB);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-
-    Consumer consumerC;
-    Result resultC = client.subscribe(topicName + "-different-topic", 
subsName, consumerConf, consumerC);
-    ASSERT_EQ(ResultOk, resultB);
-    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
-    ASSERT_EQ(ResultOk, consumerA.close());
-    ASSERT_EQ(ResultOk, consumerB.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
-
-    // consumer C should be a different instance from A and B and should be 
with open state.
-    ASSERT_EQ(ResultOk, consumerC.close());
-}
-
 static long regexTestMessagesReceived = 0;
 
 static void regexMessageListenerFunction(Consumer consumer, const Message 
&msg) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 350ed9c..07cf3cb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -61,7 +61,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected int maxReceiverQueueSize;
     protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
-    private int refCount = 0;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorService 
listenerExecutor,
@@ -394,12 +393,4 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             interceptors.onNegativeAcksSend(this, messageIds);
         }
     }
-
-    protected synchronized void incrRefCount() {
-        ++refCount;
-    }
-
-    protected synchronized boolean shouldTearDown() {
-        return refCount > 0 ? refCount-- == 0 : refCount == 0;
-    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 41eaa6d..8909989 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -647,10 +647,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        if (!shouldTearDown()) {
-            return CompletableFuture.completedFuture(null);
-        }
-
         if (getState() == State.Closing || getState() == State.Closed) {
             unAckedMessageTracker.close();
             if (possibleSendToDeadLetterTopicMessages != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3da7e14..73a4a3c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -315,11 +315,6 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> 
schema, ConsumerInterceptors<T> interceptors) {
-        Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
-        if (subscriber.isPresent()) {
-            return CompletableFuture.completedFuture(subscriber.get());
-        }
-
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -667,20 +662,6 @@ public class PulsarClientImpl implements PulsarClient {
         });
     }
 
-    @SuppressWarnings("unchecked")
-    private <T> Optional<ConsumerBase<T>> 
subscriptionExist(ConsumerConfigurationData<?> conf) {
-        synchronized (consumers) {
-            Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
-                    .filter(c -> 
c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
-                    .filter(c -> conf.getTopicNames().contains(c.getTopic()))
-                    .filter(c -> 
c.getSubscription().equals(conf.getSubscriptionName()))
-                    .filter(Consumer::isConnected)
-                    .findFirst();
-            subscriber.ifPresent(ConsumerBase::incrRefCount);
-            return subscriber.map(ConsumerBase.class::cast);
-        }
-    }
-
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData 
conf) {
         ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), 
threadFactory);
diff --git 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index d8aa539..b13cefd 100644
--- 
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ 
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -301,17 +301,12 @@ public class PulsarSpoutTest extends ProducerConsumerBase 
{
         otherSpout.open(Maps.newHashMap(), context, collector);
 
         topicStats = admin.topics().getStats(topic);
-        
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
 1);
+        
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
 2);
 
         otherSpout.close();
 
         topicStats = admin.topics().getStats(topic);
         
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
 1);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(),
 0);
     }
 
     @Test

Reply via email to