RobertIndie opened a new pull request, #187:
URL: https://github.com/apache/pulsar-client-cpp/pull/187
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see
*[Guideline - Pulsar PR Naming
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
- Fill out the template below to describe the changes contributed by the
pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from
multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text and
this checklist, leaving only the filled out template below.
-->
Fixes #186
### Motivation
This PR fixes two problems with closing the partitioned producer: The
deadlock problem mentioned in
https://github.com/apache/pulsar-client-cpp/issues/186 and another problem
about the partitioned producer incorrectly counting the number of closed
producers resulting in the hangout. Only by fixing both of these problems can
fix the https://github.com/apache/pulsar-client-cpp/issues/186. So I put these
fixes into one PR.
#### Deadlock issue
The case is that when we create a Partitioned Producer with 2 partitions.
And then we expand the topic to 3 partitions. The PP(Partitioned Producer)
will create a new internal producer(Let's called it P3)
But if we close the PP before P3 starts completed. The P3.closeAsync will be
called. And it will failed the creation for itself here:
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/ProducerImpl.cc#L938
The PP then knows the P3 has failed to create and then close PP.closeAsync
again:
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/PartitionedProducerImpl.cc#L164
The internal producers will be closed again can cause the deadlock here:
https://github.com/apache/pulsar-client-cpp/blob/63c424555cdb1209c82e2f844d7414c1fc399ef8/lib/ProducerImpl.cc#L718
Here is the stack trace in
https://github.com/apache/pulsar-client-cpp/issues/186
```
frame #6: 0x000000010c5d7672
pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007fb19e012c20,
originalCallback=<unavailable>)>) at ProducerImpl.cc:725:10
frame #7: 0x000000010c5768a1
pulsar-tests`pulsar::PartitionedProducerImpl::closeAsync(this=0x00007fb19ef04098,
originalCallback=<unavailable>)>) at PartitionedProducerImpl.cc:287:23
frame #8: 0x000000010c57518f
pulsar-tests`pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(this=0x00007fb19ef04098,
result=ResultAlreadyClosed, producerWeakPtr=<unavailable>, partitionIndex=2)
at PartitionedProducerImpl.cc:166:13
frame #9: 0x000000010c582c9c
pulsar-tests`decltype(__f=0x0000600002699868,
__a0=std::__1::shared_ptr<pulsar::PartitionedProducerImpl>::element_type @
0x00007fb19ef04098 strong=8 weak=4, __args=0x00007ff7b4127fa4, __args=nullptr,
__args=0x0000600002699888).*fp(static_cast<pulsar::Result>(fp1),
static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp1),
static_cast<unsigned int&>(fp1))) std::__1::__invoke<void
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&, void>(void
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&) at
type_traits:3859:1
frame #10: 0x000000010c582bb4 pulsar-tests`std::__1::__bind_return<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>, std::__1::tuple<pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>,
__is_valid_bind_return<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>, std::__1::tuple<pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type
std::__1::__apply_functor<void (__f=0x0000600002699868, __bound_args=size=4,
(null)=__tuple_indices<0, 1, 2, 3> @ 0x00007ff7b4127dd8,
__args=size=2)(pulsar::Result, std:
:__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>, 0ul, 1ul, 2ul, 3ul, std::__1::tuple<pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>(void
(pulsar::PartitionedProducerImpl::*&)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>&, std::__1::__tuple_indices<0ul, 1ul, 2ul, 3ul>,
std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&>&&) at bind.h:257:12
frame #11: 0x000000010c582b0b pulsar-tests`std::__1::__bind_return<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>, std::__1::tuple<pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>,
__is_valid_bind_return<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned
int>, std::__1::tuple<pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type
std::__1::__bind<void (this=0x0000600002699868, __args=0x00007ff7b4127fa4,
__args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>,
unsigned int), s
td::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>::operator()<pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at bind.h:292:20
frame #12: 0x000000010c582a95
pulsar-tests`decltype(__f=0x0000600002699868, __args=0x00007ff7b4127fa4,
__args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>,
unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>&>(fp)(static_cast<pulsar::Result>(fp0),
static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp0)))
std::__1::__invoke<std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&>(std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<
pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&,
std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result&&,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at type_traits:3918:1
frame #13: 0x000000010c582a47 pulsar-tests`void
std::__1::__invoke_void_return_wrapper<void,
true>::__call<std::__1::__bind<void (__args=0x0000600002699868,
__args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&>(std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>&, pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&) at invoke.h:61:9
frame #14: 0x000000010c5829f7
pulsar-tests`std::__1::__function::__alloc_func<std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>, std::__1::allocator<std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>>, void (pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>
const&)>::operator(this=0x0000600002699868, __arg=0x00007ff7b4127fa4,
__arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&) at function.h:178:16
frame #15: 0x000000010c5815d6
pulsar-tests`std::__1::__function::__func<std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>, std::__1::allocator<std::__1::__bind<void
(pulsar::PartitionedProducerImpl::*)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int),
std::__1::shared_ptr<pulsar::PartitionedProducerImpl>,
std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&,
unsigned int&>>, void (pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>
const&)>::operator(this=0x0000600002699860, __arg=0x00007ff7b4127fa4,
__arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&) at function.h:352:12
frame #16: 0x000000010c5edf2f
pulsar-tests`std::__1::__function::__value_func<void (pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>
const&)>::operator(this=0x0000600003d999d0, __args=0x00007ff7b4127fa4,
__args=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&) const at function.h:505:16
frame #17: 0x000000010c5edbf1 pulsar-tests`std::__1::function<void
(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>
const&)>::operator(this= Function =
pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int) ,
__arg=ResultAlreadyClosed, __arg=nullptr)(pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase> const&) const at function.h:1182:12
frame #18: 0x000000010c5d26b0
pulsar-tests`pulsar::Promise<pulsar::Result,
std::__1::weak_ptr<pulsar::ProducerImplBase>>::setFailed(this=0x00007fb19e013988,
result=ResultAlreadyClosed) const at Future.h:156:13
frame #19: 0x000000010c5daece
pulsar-tests`pulsar::ProducerImpl::shutdown(this=0x00007fb19e012c20) at
ProducerImpl.cc:945:29
frame #20: 0x000000010c5d7ed3
pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007ff7b4128670,
result=ResultOk)>)::$_6::operator()(pulsar::Result) const at
ProducerImpl.cc:716:13
```
### Modifications
<!-- Describe the modifications you've done. -->
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe
tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [ ] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]