Hi Penghui,

> Please also update the proposal for schema handling.

Done, all content discussed in this thread is updated in [1]

Thanks,
Haiting

[1] https://github.com/apache/pulsar/issues/16153

On Fri, Jul 29, 2022 at 1:19 PM PengHui Li <codelipeng...@gmail.com> wrote:

> Sound good to me.
>
> Please also update the proposal for schema handling.
>
> Thanks,
> Penghui
>
> Penghui
> On Jul 24, 2022, 17:53 +0800, Haiting Jiang <jianghait...@apache.org>,
> wrote:
> > Hi Penghui,
> >
> > > One question about the schema.
> > > How can the consumer get the schema from the shadow topic during
> > > consumption?
> > > We should add this part in the proposal.
> > >
> >
> > Thanks for the reminding.
> >
> > From what I see, Schema is part of a topic's metadata. So shadow topic
> won't
> > have it's own schema, but it shares the schema info of source topic.
> >
> > For consumers, we need to suppoort `GetSchema` command for shadow topic,
> and there are
> > two interface for this.
> >
> > 1. Binary protocol, which handles in `CommandGetSchema` in
> > `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
> > topic 's `schemaName` to the `schemaName` of source topic, and the
> > underlying read operation is supported by
> > `SchemaRegistry#getSchema(String, SchemaVersion)`.
> >
> > 2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`.
> Similar
> > with the approach in binary protocol, replace the `schemaId` with source
> > topic in `SchemasResourceBase#getSchemaId`.
> >
> > For admins, we can support other "read" ops besides `getSchema`,
> including
> > `getAllSchemas` and `getVersionBySchema`, which all can be supported by
> the
> > same way as `getSchema`.
> >
> >
> > Thanks,
> > Haiting
> >
> >
> > On 2022/07/21 02:13:08 PengHui Li wrote:
> > > Hi Haiting,
> > >
> > > One question about the schema.
> > > How can the consumer get the schema from the shadow topic during
> > > consumption?
> > > We should add this part in the proposal.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <asaf.mes...@gmail.com>
> wrote:
> > >
> > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <
> jianghait...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Asaf,
> > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's
> own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> list?
> > > > >
> > > > > Yes, mostly ledger id list and LAC of the last ledger.
> > > >
> > > >
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > >
> > > > > Yes, old ledger information will be read from metadata store when
> > > > > ShadowManagedLedger initializes. The replicator is only for new
> messages,
> > > > > to
> > > > > reduce the consume latency of subscription in shadow topic. And the
> > > > reason
> > > > > we also replicates message data is to populates the entry cache
> when
> > > > shadow
> > > > > topic have many active subscriptions.
> > > > >
> > > > > One optimization we can do is that, there would be not much help
> for
> > > > shadow
> > > > > replicator to replicate message in backlog. We can come up with
> some
> > > > > policy to
> > > > > reset shadow replicator cursor in future PR.
> > > > >
> > > >
> > > > I'm not sure I'm following you.
> > > > What do you mean by old ledger information and new ledger
> information?
> > > >
> > > > What I'm trying to understand is: why do you need to copy the source
> topic
> > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you
> just
> > > > use the original topic metadata?
> > > >
> > > >
> > > >
> > > > >
> > > > > > Another question - I couldn't understand why you need to change
> the
> > > > > > protocol to introduce shadow message id. Can you please explain
> that to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or
> used by
> > > > a
> > > > > > Pulsar Client?
> > > > >
> > > > > CommandSend is designed for pulsar producer client first, and
> > > > > geo-replication
> > > > > reuse producer client to replicate messages between pulsar
> clusters.
> > > > >
> > > > > The shadow message id contains the ledger id and entry id of this
> > > > message.
> > > > > When shadow topic receive the message id, it is able to update
> > > > > `lastConfirmedEntry` directly, so that subscription can consume
> this this
> > > > > new
> > > > > message.
> > > > > Also shadow topic can tell if the message is from shadow
> replicator and
> > > > > reject
> > > > > otherwise.
> > > > >
> > > > >
> > > > I think the flow of information is the part I don't understand.
> > > >
> > > > In the PIP you write "The message sync procedure of shadow topic is
> > > > supported by shadow replication, which is very like geo-replication,
> with
> > > > these differences:"
> > > > What I don't understand is that you write that this is a read-only
> topic,
> > > > so why replicate/sync messages?
> > > >
> > > > I managed to understand that you want to populate the BK entry cache
> of the
> > > > topic ledgers in the shadow topic broker. Instead of reading from BK
> and
> > > > storing it in the cache, you favor copying from the source topic
> broker
> > > > cache memory to the shadow topic broker cache. Is this to save the
> > > > bandwidth of BK? I presume the most recent messages of BK would be in
> > > > memory anyway, no?
> > > >
> > > >
> > > >
> > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's
> own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> list?
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > > >
> > > > > > Another question - I couldn't understand why you need to change
> the
> > > > > > protocol to introduce shadow message id. Can you please explain
> that to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or
> used by
> > > > a
> > > > > > Pulsar Client?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Asaf
> > > > > >
> > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > > > jianghait...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Pulsar community:
> > > > > > >
> > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to
> support
> > > > > > > readonly topic ownership."
> > > > > > >
> > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > > ---
> > > > > > >
> > > > > > > ## Motivation
> > > > > > >
> > > > > > > The motivation is the same as PIP-63[1], with a new broadcast
> use
> > > > case
> > > > > of
> > > > > > > supporting 100K subscriptions in a single topic.
> > > > > > > 1. The bandwidth of a broker limits the number of
> subscriptions for a
> > > > > > > single
> > > > > > > topic.
> > > > > > > 2. Subscriptions are competing for the network bandwidth on
> brokers.
> > > > > > > Different
> > > > > > > subscriptions might have different levels of severity.
> > > > > > > 3. When synchronizing cross-city message reading, cross-city
> access
> > > > > needs
> > > > > > > to
> > > > > > > be minimized.
> > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a
> limitation of
> > > > > the
> > > > > > > subscription number of a single topic. It's tested by Hongjie
> from
> > > > > NTT
> > > > > > > Lab
> > > > > > > that with 40K subscriptions in a single topic, the client needs
> > > > > about
> > > > > > > 20min
> > > > > > > to start all client connections, and under 1 msg/s message
> > > > producer
> > > > > > > rate,
> > > > > > > the average end to end latency is about 2.9s. And for 100K
> > > > > > > subscriptions,
> > > > > > > the time of start connection and E2E latency is beyond
> > > > > consideration.
> > > > > > >
> > > > > > > However, it's too complicated to implement with original PIP-63
> > > > > proposal,
> > > > > > > the
> > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and
> there
> > > > are
> > > > > > > still
> > > > > > > some problems left,
> > > > > > > 1. The LAC in readonly topic is updated in a polling pattern,
> which
> > > > > > > increases
> > > > > > > the bookie load bookie.
> > > > > > > 2. The message data of readonly topic won't be cached in
> broker.
> > > > > Increase
> > > > > > > the
> > > > > > > network usage between broker and bookie when there are more
> than
> > > > one
> > > > > > > subscriber is tail-reading.
> > > > > > > 3. All the subscriptions is managed in original
> writable-topic, so
> > > > the
> > > > > > > support
> > > > > > > max subscription number is not scaleable.
> > > > > > >
> > > > > > > This PIP tries to come up with a simpler solution to support
> readonly
> > > > > topic
> > > > > > > ownership and solve the problems the previous PR left. The
> main idea
> > > > of
> > > > > > > this
> > > > > > > solution is to reuse the feature of geo-replication, but
> instead of
> > > > > > > duplicating storage, it shares underlying bookie ledgers
> between
> > > > > different
> > > > > > > topics.
> > > > > > >
> > > > > > > ## Goal
> > > > > > >
> > > > > > > The goal is to introduce **Shadow Topic** as a new type of
> topic to
> > > > > support
> > > > > > > readonly topic ownership. Just as its name implies, a shadow
> topic is
> > > > > the
> > > > > > > shadow of some normal persistent topic (let's call it source
> topic
> > > > > here).
> > > > > > > The
> > > > > > > source topic and the shadow topic must have the same number of
> > > > > partitions
> > > > > > > or
> > > > > > > both non-partitioned. Multiply shadow topics can be created
> from a
> > > > > source
> > > > > > > topic.
> > > > > > >
> > > > > > > Shadow topic shares the underlying bookie ledgers from its
> source
> > > > > topic.
> > > > > > > User
> > > > > > > can't produce any messages to shadow topic directly and shadow
> topic
> > > > > don't
> > > > > > > create any new ledger for messages, all messages in shadow
> topic come
> > > > > from
> > > > > > > source topic.
> > > > > > >
> > > > > > > Shadow topic have its own subscriptions and don't share with
> its
> > > > source
> > > > > > > topic.
> > > > > > > This means the shadow topic have its own cursor ledger to store
> > > > > persistent
> > > > > > > mark-delete info for each persistent subscriptions.
> > > > > > >
> > > > > > > The message sync procedure of shadow topic is supported by
> shadow
> > > > > > > replication,
> > > > > > > which is very like geo-replication, with these difference:
> > > > > > > 1. Geo-replication only works between topic with the same name
> in
> > > > > different
> > > > > > > broker clusters. But shadow topic have no naming limitation and
> > > > they
> > > > > > > can be
> > > > > > > in the same cluster.
> > > > > > > 2. Geo-replication duplicates data storage, but shadow topic
> don't.
> > > > > > > 3. Geo-replication replicates data from each other, it's
> > > > > bidirectional, but
> > > > > > > shadow replication only have one way data flow.
> > > > > > >
> > > > > > >
> > > > > > > ## API Changes
> > > > > > >
> > > > > > > 1. PulsarApi.proto.
> > > > > > >
> > > > > > > Shadow topic need to know the original message id of the
> replicated
> > > > > > > messages,
> > > > > > > in order to update new ledger and lac. So we need add a
> > > > > > > `shadow_message_id` in
> > > > > > > CommandSend for replicator.
> > > > > > >
> > > > > > > ```
> > > > > > > message CommandSend { // ... // message id for shadow topic
> optional
> > > > > > > MessageIdData shadow_message_id = 9; }
> > > > > > > ```
> > > > > > >
> > > > > > > 2. Admin API for creating shadow topic with source topic
> > > > > > > ```
> > > > > > > admin.topics().createShadowTopic(source-topic-name,
> > > > > shadow-topic-name)
> > > > > > > ```
> > > > > > >
> > > > > > > ## Implementation
> > > > > > >
> > > > > > > A picture showing key components relations is added in github
> issue
> > > > > [3].
> > > > > > >
> > > > > > > There are two key changes for implementation.
> > > > > > > 1. How to replicate messages to shadow topics.
> > > > > > > 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > ### 1. How to replicate messages to shadow topics.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowReplicator`, which
> extends
> > > > > > > `PersistentReplicator` introduced in geo-replication. The
> shadow
> > > > topic
> > > > > list
> > > > > > > is added as a new topic policy of the source topic. Source
> topic
> > > > > manage the
> > > > > > > lifecycle of all the replicators. The key is to add
> > > > `shadow_message_id`
> > > > > > > when
> > > > > > > produce message to shadow topics.
> > > > > > >
> > > > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > > > extends
> > > > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > > > >
> > > > > > > 1. `initialize(..)`
> > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current
> shadow
> > > > > topic.
> > > > > > > The source topic name is stored in the topic policy of the
> shadow
> > > > > topic.
> > > > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > > > instead
> > > > > of
> > > > > > > creating new ledger. Reading LAC here requires that the source
> > > > topic
> > > > > > > must
> > > > > > > enable explicit LAC feature by set
> > > > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > > > > to non-zero value in broker.conf.
> > > > > > > c. Do not start checkLedgerRollTask, which tries roll over
> ledger
> > > > > > > periodically
> > > > > > >
> > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to
> bookie,
> > > > It
> > > > > only
> > > > > > > update metadata of ledgers, like `currentLedger`,
> > > > > `lastConfirmedEntry`
> > > > > > > and
> > > > > > > put the replicated message into `EntryCache`.
> > > > > > >
> > > > > > > Besides, some other problems need to be taken care of.
> > > > > > > - Any ledger metadata updates need to be synced to shadow
> topic,
> > > > > including
> > > > > > > ledger offloading or ledger deletion. Shadow topic needs to
> watch
> > > > the
> > > > > > > ledger
> > > > > > > info updates with metadata store and update in time.
> > > > > > > - The local cached LAC of `LedgerHandle` won't updated in
> time, so we
> > > > > need
> > > > > > > refresh LAC when a managed cursor requests entries beyond known
> > > > LAC.
> > > > > > >
> > > > > > > ## Reject Alternatives
> > > > > > >
> > > > > > > See PIP-63[1].
> > > > > > >
> > > > > > > ## Reference
> > > > > > > [1]
> > > > > > >
> > > > >
> > > >
> https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > >
> > > > > > > BR,
> > > > > > > Haiting Jiang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to