BewareMyPower commented on code in PR #321:
URL: https://github.com/apache/pulsar-client-cpp/pull/321#discussion_r1338335895
##########
lib/ProducerImpl.cc:
##########
@@ -912,10 +912,9 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
if (op.chunkedMessageId) {
Review Comment:
It seems that `OpSendMsg::chunkedMessageId` field is not necessary. It's
added just to indicate if the current `OpSendMsg` is related to a chunk. But
`OpSendMsg::numChunks` can also be used for that.
Without saving an empty `ChunkMessageIdImpl`, we can just merge
`setChunkedMessageIds` to the current constructor:
```c++
ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
: chunkedMessageIds_(std::move(chunkedMessageIds)) {
auto lastChunkMsgId = chunkedMessageIds_.back();
this->ledgerId_ = lastChunkMsgId.ledgerId();
this->entryId_ = lastChunkMsgId.entryId();
this->partition_ = lastChunkMsgId.partition();
}
```
After that, in `ConsumerImpl` we can write:
```c++
ChunkMessageIdImplPtr chunkMsgId =
std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds());
```
and in `ProducerImpl`:
```c++
if (op.numChunks > 1) {
// Handling the chunk message id.
op.chunkMessageIdList.push_back(messageId);
if (op.chunkId == op.numChunks - 1) {
auto chunkedMessageId =
std::make_shared<ChunkMessageIdImpl>(std::move(op.chunkMessageIdList));
messageId = chunkedMessageId->build();
}
```
##########
lib/ChunkMessageIdImpl.h:
##########
@@ -30,19 +30,24 @@ class ChunkMessageIdImpl : public MessageIdImpl, public
std::enable_shared_from_
public:
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>())
{}
- void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ =
*msgId.impl_; }
+ void setChunkedMessageIds(std::vector<MessageId>&& chunkedMessageIds) {
+ chunkedMessageIds_ = std::move(chunkedMessageIds);
+ auto lastChunkMsgId = chunkedMessageIds_.back();
+ this->ledgerId_ = lastChunkMsgId.ledgerId();
+ this->entryId_ = lastChunkMsgId.entryId();
+ this->partition_ = lastChunkMsgId.partition();
+ }
- void setLastChunkMessageId(const MessageId& msgId) {
- this->ledgerId_ = msgId.ledgerId();
- this->entryId_ = msgId.entryId();
- this->partition_ = msgId.partition();
+ std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const {
+ return chunkedMessageIds_.front().impl_;
}
- std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const {
return firstChunkMsgId_; }
+ std::vector<MessageId> moveChunkedMessageIds() noexcept { return
std::move(chunkedMessageIds_); }
MessageId build() { return
MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
private:
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
Review Comment:
This field becomes useless after having the `chunkedMessageIds_`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]