Gleiphir2769 commented on issue #456:
URL: 
https://github.com/apache/pulsar-client-go/issues/456#issuecomment-1170147387

   Hello, I am very interested in completing this feature, the following is my 
plan.
   ## Motivation
   Make pulsar go client support chunking to produce and consume big messages 
without closing baching. 
   ## Modifications
   ### Publish Chunked Messages
   The `maxMessageSize` limited the big message publishing. 
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L427-L436
   If the size of message payload is bigger than `maxMessageSize`, it will be 
discarded. So it should be split into chunked messages with a size not 
exceeding the `maxMessageSize`, and they are sent to the brokers separately. I 
think the chunk logic can be added in `internalSendAsync`.
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L741
   ### Receive Chunked Messages
   Pulsar allows multiple Producers to produce messages to the same topic at 
the same time, which means that the chunks of multiple big messages may be 
alternately arranged in the topic. And each chunk of the same big message is 
not necessarily consecutive arrived (but must arrive in order, which is 
guaranteed by the broker).
   So the go client needs a `ChunkedMessageCtx` to track and buffer the chunked 
message. The context `ChunkedMessageCtx` maintains the position of the 
currently received chunks and accumulates the `payload` of the chunks that have 
been received. When all chunks are received, `ChunkedMessageCtx` returns the 
accumulated `payload` to the user, i.e. the full message before the chunking.
   
![snipaste_2022-05-16_17-54-35.png](https://s2.loli.net/2022/05/20/Boyx7FZe8S5ONH1.png)
   All `ChunkedMessageCtx` need to be maintained in a cache. Due to memory 
pressure, the number of `ChunkedMessageCtx` needs to be limited (the default 
upper limit for Java clients is 100). This cache is essentially a concurrent 
map with eviction policy (LRU). It can be simply implementted as map + mutex + 
pending queue or some other more complex one 
(https://github.com/Gleiphir2769/s-cache). 
   I think it shoud work in here.
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L553
   ### Some Details
   #### Batching
   Currently pulsar go client depends on `BatchBuilder` to send all messages 
even `batching` is closed (each message will cause a flush of the batch in this 
case). 
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L472-L474
   In the [Java 
Client](https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1313-L1323),
 batch message logic will skip the processing of chunk messages. So we need a 
single message sending implement independent of `BatchBuilder`.
   Considering the problem of consumer available-permits calculation in shared 
subscription ([issue #10417](https://github.com/apache/pulsar/pull/10417)), 
`batching` and `chunking` cannot be enabled at the same time.
   #### Chunked Message ID 
   This is related to [PIP 107](https://github.com/apache/pulsar/issues/12402). 
It's good to take the solution in the new Java Client, which is to implement an 
`ChunkMessageIdImpl` that can invoke `getFirstChunkMessageId`. It will modify 
the `Seek` implement which seek the first chunk message id.
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L428-L431
   
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L447-L459
   #### Size Calculation
   This is related to [issue 
#16196](https://github.com/apache/pulsar/pull/16196). Message metadate should 
be updated before computing the chunk size. An the total size should include 
all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.
   #### Shared Subscription
   There are some problems of chunking with shared subscription. [issue 
#16202](https://github.com/apache/pulsar/pull/16202) supported chunking with 
Shared subscription. And go client may not need to limit chunking with Shared 
subscription in `ConsumerImpl`.
   #### unAckedChunkedMessageIdSequenceMap
   (wait to be completed)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to