Hi Penghui, > Please also update the proposal for schema handling.
Done, all content discussed in this thread is updated in [1] Thanks, Haiting [1] https://github.com/apache/pulsar/issues/16153 On Fri, Jul 29, 2022 at 1:19 PM PengHui Li <codelipeng...@gmail.com> wrote: > Sound good to me. > > Please also update the proposal for schema handling. > > Thanks, > Penghui > > Penghui > On Jul 24, 2022, 17:53 +0800, Haiting Jiang <jianghait...@apache.org>, > wrote: > > Hi Penghui, > > > > > One question about the schema. > > > How can the consumer get the schema from the shadow topic during > > > consumption? > > > We should add this part in the proposal. > > > > > > > Thanks for the reminding. > > > > From what I see, Schema is part of a topic's metadata. So shadow topic > won't > > have it's own schema, but it shares the schema info of source topic. > > > > For consumers, we need to suppoort `GetSchema` command for shadow topic, > and there are > > two interface for this. > > > > 1. Binary protocol, which handles in `CommandGetSchema` in > > `ServerCnx#handleGetSchema`. We only need to replace the requested shadow > > topic 's `schemaName` to the `schemaName` of source topic, and the > > underlying read operation is supported by > > `SchemaRegistry#getSchema(String, SchemaVersion)`. > > > > 2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. > Similar > > with the approach in binary protocol, replace the `schemaId` with source > > topic in `SchemasResourceBase#getSchemaId`. > > > > For admins, we can support other "read" ops besides `getSchema`, > including > > `getAllSchemas` and `getVersionBySchema`, which all can be supported by > the > > same way as `getSchema`. > > > > > > Thanks, > > Haiting > > > > > > On 2022/07/21 02:13:08 PengHui Li wrote: > > > Hi Haiting, > > > > > > One question about the schema. > > > How can the consumer get the schema from the shadow topic during > > > consumption? > > > We should add this part in the proposal. > > > > > > Thanks, > > > Penghui > > > > > > On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <asaf.mes...@gmail.com> > wrote: > > > > > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang < > jianghait...@apache.org> > > > > wrote: > > > > > > > > > Hi Asaf, > > > > > > > > > > > I did a quick reading and I couldn't understand the gist of this > > > > change: > > > > > > The shadow topic doesn't really have it's own messages, or it's > own > > > > > ledgers > > > > > > right? When it reads messages, it reads from the original topic > > > > ledgers. > > > > > So > > > > > > the only thing you need to do is sync the "metadata" - ledgers > list? > > > > > > > > > > Yes, mostly ledger id list and LAC of the last ledger. > > > > > > > > > > > > > > One question comes to mind here: Why not simply read the ledger > > > > > information > > > > > > from original topic, without copy? > > > > > > > > > > Yes, old ledger information will be read from metadata store when > > > > > ShadowManagedLedger initializes. The replicator is only for new > messages, > > > > > to > > > > > reduce the consume latency of subscription in shadow topic. And the > > > > reason > > > > > we also replicates message data is to populates the entry cache > when > > > > shadow > > > > > topic have many active subscriptions. > > > > > > > > > > One optimization we can do is that, there would be not much help > for > > > > shadow > > > > > replicator to replicate message in backlog. We can come up with > some > > > > > policy to > > > > > reset shadow replicator cursor in future PR. > > > > > > > > > > > > > I'm not sure I'm following you. > > > > What do you mean by old ledger information and new ledger > information? > > > > > > > > What I'm trying to understand is: why do you need to copy the source > topic > > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you > just > > > > use the original topic metadata? > > > > > > > > > > > > > > > > > > > > > > > Another question - I couldn't understand why you need to change > the > > > > > > protocol to introduce shadow message id. Can you please explain > that to > > > > > me? > > > > > > Is CommandSend used only internally between Pulsar Clusters or > used by > > > > a > > > > > > Pulsar Client? > > > > > > > > > > CommandSend is designed for pulsar producer client first, and > > > > > geo-replication > > > > > reuse producer client to replicate messages between pulsar > clusters. > > > > > > > > > > The shadow message id contains the ledger id and entry id of this > > > > message. > > > > > When shadow topic receive the message id, it is able to update > > > > > `lastConfirmedEntry` directly, so that subscription can consume > this this > > > > > new > > > > > message. > > > > > Also shadow topic can tell if the message is from shadow > replicator and > > > > > reject > > > > > otherwise. > > > > > > > > > > > > > > I think the flow of information is the part I don't understand. > > > > > > > > In the PIP you write "The message sync procedure of shadow topic is > > > > supported by shadow replication, which is very like geo-replication, > with > > > > these differences:" > > > > What I don't understand is that you write that this is a read-only > topic, > > > > so why replicate/sync messages? > > > > > > > > I managed to understand that you want to populate the BK entry cache > of the > > > > topic ledgers in the shadow topic broker. Instead of reading from BK > and > > > > storing it in the cache, you favor copying from the source topic > broker > > > > cache memory to the shadow topic broker cache. Is this to save the > > > > bandwidth of BK? I presume the most recent messages of BK would be in > > > > memory anyway, no? > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > Haiting > > > > > > > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote: > > > > > > Hi, > > > > > > > > > > > > I did a quick reading and I couldn't understand the gist of this > > > > change: > > > > > > The shadow topic doesn't really have it's own messages, or it's > own > > > > > ledgers > > > > > > right? When it reads messages, it reads from the original topic > > > > ledgers. > > > > > So > > > > > > the only thing you need to do is sync the "metadata" - ledgers > list? > > > > > > One question comes to mind here: Why not simply read the ledger > > > > > information > > > > > > from original topic, without copy? > > > > > > > > > > > > Another question - I couldn't understand why you need to change > the > > > > > > protocol to introduce shadow message id. Can you please explain > that to > > > > > me? > > > > > > Is CommandSend used only internally between Pulsar Clusters or > used by > > > > a > > > > > > Pulsar Client? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Asaf > > > > > > > > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang < > > > > jianghait...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Hi Pulsar community: > > > > > > > > > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to > support > > > > > > > readonly topic ownership." > > > > > > > > > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > > > --- > > > > > > > > > > > > > > ## Motivation > > > > > > > > > > > > > > The motivation is the same as PIP-63[1], with a new broadcast > use > > > > case > > > > > of > > > > > > > supporting 100K subscriptions in a single topic. > > > > > > > 1. The bandwidth of a broker limits the number of > subscriptions for a > > > > > > > single > > > > > > > topic. > > > > > > > 2. Subscriptions are competing for the network bandwidth on > brokers. > > > > > > > Different > > > > > > > subscriptions might have different levels of severity. > > > > > > > 3. When synchronizing cross-city message reading, cross-city > access > > > > > needs > > > > > > > to > > > > > > > be minimized. > > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a > limitation of > > > > > the > > > > > > > subscription number of a single topic. It's tested by Hongjie > from > > > > > NTT > > > > > > > Lab > > > > > > > that with 40K subscriptions in a single topic, the client needs > > > > > about > > > > > > > 20min > > > > > > > to start all client connections, and under 1 msg/s message > > > > producer > > > > > > > rate, > > > > > > > the average end to end latency is about 2.9s. And for 100K > > > > > > > subscriptions, > > > > > > > the time of start connection and E2E latency is beyond > > > > > consideration. > > > > > > > > > > > > > > However, it's too complicated to implement with original PIP-63 > > > > > proposal, > > > > > > > the > > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and > there > > > > are > > > > > > > still > > > > > > > some problems left, > > > > > > > 1. The LAC in readonly topic is updated in a polling pattern, > which > > > > > > > increases > > > > > > > the bookie load bookie. > > > > > > > 2. The message data of readonly topic won't be cached in > broker. > > > > > Increase > > > > > > > the > > > > > > > network usage between broker and bookie when there are more > than > > > > one > > > > > > > subscriber is tail-reading. > > > > > > > 3. All the subscriptions is managed in original > writable-topic, so > > > > the > > > > > > > support > > > > > > > max subscription number is not scaleable. > > > > > > > > > > > > > > This PIP tries to come up with a simpler solution to support > readonly > > > > > topic > > > > > > > ownership and solve the problems the previous PR left. The > main idea > > > > of > > > > > > > this > > > > > > > solution is to reuse the feature of geo-replication, but > instead of > > > > > > > duplicating storage, it shares underlying bookie ledgers > between > > > > > different > > > > > > > topics. > > > > > > > > > > > > > > ## Goal > > > > > > > > > > > > > > The goal is to introduce **Shadow Topic** as a new type of > topic to > > > > > support > > > > > > > readonly topic ownership. Just as its name implies, a shadow > topic is > > > > > the > > > > > > > shadow of some normal persistent topic (let's call it source > topic > > > > > here). > > > > > > > The > > > > > > > source topic and the shadow topic must have the same number of > > > > > partitions > > > > > > > or > > > > > > > both non-partitioned. Multiply shadow topics can be created > from a > > > > > source > > > > > > > topic. > > > > > > > > > > > > > > Shadow topic shares the underlying bookie ledgers from its > source > > > > > topic. > > > > > > > User > > > > > > > can't produce any messages to shadow topic directly and shadow > topic > > > > > don't > > > > > > > create any new ledger for messages, all messages in shadow > topic come > > > > > from > > > > > > > source topic. > > > > > > > > > > > > > > Shadow topic have its own subscriptions and don't share with > its > > > > source > > > > > > > topic. > > > > > > > This means the shadow topic have its own cursor ledger to store > > > > > persistent > > > > > > > mark-delete info for each persistent subscriptions. > > > > > > > > > > > > > > The message sync procedure of shadow topic is supported by > shadow > > > > > > > replication, > > > > > > > which is very like geo-replication, with these difference: > > > > > > > 1. Geo-replication only works between topic with the same name > in > > > > > different > > > > > > > broker clusters. But shadow topic have no naming limitation and > > > > they > > > > > > > can be > > > > > > > in the same cluster. > > > > > > > 2. Geo-replication duplicates data storage, but shadow topic > don't. > > > > > > > 3. Geo-replication replicates data from each other, it's > > > > > bidirectional, but > > > > > > > shadow replication only have one way data flow. > > > > > > > > > > > > > > > > > > > > > ## API Changes > > > > > > > > > > > > > > 1. PulsarApi.proto. > > > > > > > > > > > > > > Shadow topic need to know the original message id of the > replicated > > > > > > > messages, > > > > > > > in order to update new ledger and lac. So we need add a > > > > > > > `shadow_message_id` in > > > > > > > CommandSend for replicator. > > > > > > > > > > > > > > ``` > > > > > > > message CommandSend { // ... // message id for shadow topic > optional > > > > > > > MessageIdData shadow_message_id = 9; } > > > > > > > ``` > > > > > > > > > > > > > > 2. Admin API for creating shadow topic with source topic > > > > > > > ``` > > > > > > > admin.topics().createShadowTopic(source-topic-name, > > > > > shadow-topic-name) > > > > > > > ``` > > > > > > > > > > > > > > ## Implementation > > > > > > > > > > > > > > A picture showing key components relations is added in github > issue > > > > > [3]. > > > > > > > > > > > > > > There are two key changes for implementation. > > > > > > > 1. How to replicate messages to shadow topics. > > > > > > > 2. How shadow topic manage shared ledgers info. > > > > > > > > > > > > > > ### 1. How to replicate messages to shadow topics. > > > > > > > > > > > > > > This part is mostly implemented by `ShadowReplicator`, which > extends > > > > > > > `PersistentReplicator` introduced in geo-replication. The > shadow > > > > topic > > > > > list > > > > > > > is added as a new topic policy of the source topic. Source > topic > > > > > manage the > > > > > > > lifecycle of all the replicators. The key is to add > > > > `shadow_message_id` > > > > > > > when > > > > > > > produce message to shadow topics. > > > > > > > > > > > > > > ### 2. How shadow topic manage shared ledgers info. > > > > > > > > > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which > > > > extends > > > > > > > current `ManagedLedgerImpl` with two key override methods. > > > > > > > > > > > > > > 1. `initialize(..)` > > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current > shadow > > > > > topic. > > > > > > > The source topic name is stored in the topic policy of the > shadow > > > > > topic. > > > > > > > b. Open the last ledger and read the explicit LAC from bookie, > > > > instead > > > > > of > > > > > > > creating new ledger. Reading LAC here requires that the source > > > > topic > > > > > > > must > > > > > > > enable explicit LAC feature by set > > > > > > > `bookkeeperExplicitLacIntervalInMills` > > > > > > > to non-zero value in broker.conf. > > > > > > > c. Do not start checkLedgerRollTask, which tries roll over > ledger > > > > > > > periodically > > > > > > > > > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to > bookie, > > > > It > > > > > only > > > > > > > update metadata of ledgers, like `currentLedger`, > > > > > `lastConfirmedEntry` > > > > > > > and > > > > > > > put the replicated message into `EntryCache`. > > > > > > > > > > > > > > Besides, some other problems need to be taken care of. > > > > > > > - Any ledger metadata updates need to be synced to shadow > topic, > > > > > including > > > > > > > ledger offloading or ledger deletion. Shadow topic needs to > watch > > > > the > > > > > > > ledger > > > > > > > info updates with metadata store and update in time. > > > > > > > - The local cached LAC of `LedgerHandle` won't updated in > time, so we > > > > > need > > > > > > > refresh LAC when a managed cursor requests entries beyond known > > > > LAC. > > > > > > > > > > > > > > ## Reject Alternatives > > > > > > > > > > > > > > See PIP-63[1]. > > > > > > > > > > > > > > ## Reference > > > > > > > [1] > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support > > > > > > > [2] https://github.com/apache/pulsar/pull/11960 > > > > > > > [3] https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > Haiting Jiang > > > > > > > > > > > > > > > > > > > > > > > > > >