RobertIndie opened a new pull request, #187:
URL: https://github.com/apache/pulsar-client-cpp/pull/187

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see 
*[Guideline - Pulsar PR Naming 
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
 
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   -->
   
   Fixes #186
   
   
   ### Motivation
   
   This PR fixes two problems with closing the partitioned producer: The 
deadlock problem mentioned in 
https://github.com/apache/pulsar-client-cpp/issues/186 and another problem 
about the partitioned producer incorrectly counting the number of closed 
producers resulting in the hangout. Only by fixing both of these problems can 
fix the https://github.com/apache/pulsar-client-cpp/issues/186. So I put these 
fixes into one PR.
   
   #### Deadlock issue
   
   The case is that when we create a Partitioned Producer with 2 partitions. 
   And then we expand the topic to 3 partitions. The PP(Partitioned Producer) 
will create a new internal producer(Let's called it P3)
   
   But if we close the PP before P3 starts completed. The P3.closeAsync will be 
called. And it will failed the creation for itself here: 
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/ProducerImpl.cc#L938
   
   The PP then knows the P3 has failed to create and then close PP.closeAsync 
again:
   
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/PartitionedProducerImpl.cc#L164
   
   The internal producers will be closed again can cause the deadlock here:
   
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/ProducerImpl.cc#L718
 
   
   
   Here is the stack trace in 
https://github.com/apache/pulsar-client-cpp/issues/186
   
   ```
       frame #6: 0x000000010c5d7672 
pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007fb19e012c20, 
originalCallback=<unavailable>)>) at ProducerImpl.cc:725:10
       frame #7: 0x000000010c5768a1 
pulsar-tests`pulsar::PartitionedProducerImpl::closeAsync(this=0x00007fb19ef04098,
 originalCallback=<unavailable>)>) at PartitionedProducerImpl.cc:287:23
       frame #8: 0x000000010c57518f 
pulsar-tests`pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(this=0x00007fb19ef04098,
 result=ResultAlreadyClosed, producerWeakPtr=<unavailable>, partitionIndex=2) 
at PartitionedProducerImpl.cc:166:13
       frame #9: 0x000000010c582c9c 
pulsar-tests`decltype(__f=0x0000600002699868, 
__a0=std::__1::shared_ptr<pulsar::PartitionedProducerImpl>::element_type @ 
0x00007fb19ef04098 strong=8 weak=4, __args=0x00007ff7b4127fa4, __args=nullptr, 
__args=0x0000600002699888).*fp(static_cast<pulsar::Result>(fp1), 
static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp1), 
static_cast<unsigned int&>(fp1))) std::__1::__invoke<void 
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&, void>(void 
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&) at 
type_traits:3859:1
       frame #10: 0x000000010c582bb4 pulsar-tests`std::__1::__bind_return<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>, std::__1::tuple<pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>, 
__is_valid_bind_return<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>, std::__1::tuple<pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type 
std::__1::__apply_functor<void (__f=0x0000600002699868, __bound_args=size=4, 
(null)=__tuple_indices<0, 1, 2, 3> @ 0x00007ff7b4127dd8, 
__args=size=2)(pulsar::Result, std:
 :__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>, 0ul, 1ul, 2ul, 3ul, std::__1::tuple<pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>(void 
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>&, std::__1::__tuple_indices<0ul, 1ul, 2ul, 3ul>, 
std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&>&&) at bind.h:257:12
       frame #11: 0x000000010c582b0b pulsar-tests`std::__1::__bind_return<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>, std::__1::tuple<pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>, 
__is_valid_bind_return<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned 
int>, std::__1::tuple<pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type 
std::__1::__bind<void (this=0x0000600002699868, __args=0x00007ff7b4127fa4, 
__args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, 
unsigned int), s
 td::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>::operator()<pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at bind.h:292:20
       frame #12: 0x000000010c582a95 
pulsar-tests`decltype(__f=0x0000600002699868, __args=0x00007ff7b4127fa4, 
__args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, 
unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>&>(fp)(static_cast<pulsar::Result>(fp0), 
static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp0))) 
std::__1::__invoke<std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&>(std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<
 pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, 
std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result&&, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at type_traits:3918:1
       frame #13: 0x000000010c582a47 pulsar-tests`void 
std::__1::__invoke_void_return_wrapper<void, 
true>::__call<std::__1::__bind<void (__args=0x0000600002699868, 
__args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&>(std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>&, pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&) at invoke.h:61:9
       frame #14: 0x000000010c5829f7 
pulsar-tests`std::__1::__function::__alloc_func<std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>, std::__1::allocator<std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>>, void (pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&)>::operator(this=0x0000600002699868, __arg=0x00007ff7b4127fa4, 
__arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&) at function.h:178:16
       frame #15: 0x000000010c5815d6 
pulsar-tests`std::__1::__function::__func<std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>, std::__1::allocator<std::__1::__bind<void 
(pulsar::PartitionedProducerImpl::*)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), 
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, 
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, 
unsigned int&>>, void (pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&)>::operator(this=0x0000600002699860, __arg=0x00007ff7b4127fa4, 
__arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&) at function.h:352:12
       frame #16: 0x000000010c5edf2f 
pulsar-tests`std::__1::__function::__value_func<void (pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&)>::operator(this=0x0000600003d999d0, __args=0x00007ff7b4127fa4, 
__args=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&) const at function.h:505:16
       frame #17: 0x000000010c5edbf1 pulsar-tests`std::__1::function<void 
(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> 
const&)>::operator(this= Function = 
pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(pulsar::Result,
 std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int) , 
__arg=ResultAlreadyClosed, __arg=nullptr)(pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) const at function.h:1182:12
       frame #18: 0x000000010c5d26b0 
pulsar-tests`pulsar::Promise<pulsar::Result, 
std::__1::weak_ptr<pulsar::ProducerImplBase>>::setFailed(this=0x00007fb19e013988,
 result=ResultAlreadyClosed) const at Future.h:156:13
       frame #19: 0x000000010c5daece 
pulsar-tests`pulsar::ProducerImpl::shutdown(this=0x00007fb19e012c20) at 
ProducerImpl.cc:945:29
       frame #20: 0x000000010c5d7ed3 
pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007ff7b4128670, 
result=ResultOk)>)::$_6::operator()(pulsar::Result) const at 
ProducerImpl.cc:716:13
       
   ```
   
   
   
   ### Modifications
   
   <!-- Describe the modifications you've done. -->
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [ ] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to