Yes, we are reading the HBase WAL (from their replication plugin support),
and writing that into DL.

>From the sounds of it, yes, it would. Only thing I would say is make the
epoch requirement optional, so that if I client doesn't care about dupes
they don't have to deal with the process of getting a new epoch.

-Cameron

On Wed, Oct 19, 2016 at 7:43 PM, Sijie Guo <sij...@twitter.com.invalid>
wrote:

> On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo <sij...@twitter.com> wrote:
>
> >
> >
> > On Monday, October 17, 2016, Cameron Hatfield <kin...@gmail.com> wrote:
> >
> >> Answer inline:
> >>
> >> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <si...@apache.org> wrote:
> >>
> >> > Cameron,
> >> >
> >> > Thank you for your summary. I liked the discussion here. I also liked
> >> the
> >> > summary of your requirement - 'single-writer-per-key,
> >> > multiple-writers-per-log'. If I understand correctly, the core concern
> >> here
> >> > is almost 'exact-once' write (or a way to explicit tell if a write can
> >> be
> >> > retried or not).
> >> >
> >> > Comments inline.
> >> >
> >> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <kin...@gmail.com>
> >> > wrote:
> >> >
> >> > > > Ah- yes good point (to be clear we're not using the proxy this way
> >> > > today).
> >> > >
> >> > > > > Due to the source of the
> >> > > > > data (HBase Replication), we cannot guarantee that a single
> >> partition
> >> > > will
> >> > > > > be owned for writes by the same client.
> >> > >
> >> > > > Do you mean you *need* to support multiple writers issuing
> >> interleaved
> >> > > > writes or is it just that they might sometimes interleave writes
> and
> >> > you
> >> > > >don't care?
> >> > > How HBase partitions the keys being written wouldn't have a one->one
> >> > > mapping with the partitions we would have in HBase. Even if we did
> >> have
> >> > > that alignment when the cluster first started, HBase will rebalance
> >> what
> >> > > servers own what partitions, as well as split and merge partitions
> >> that
> >> > > already exist, causing eventual drift from one log per partition.
> >> > > Because we want ordering guarantees per key (row in hbase), we
> >> partition
> >> > > the logs by the key. Since multiple writers are possible per range
> of
> >> > keys
> >> > > (due to the aforementioned rebalancing / splitting / etc of hbase),
> we
> >> > > cannot use the core library due to requiring a single writer for
> >> > ordering.
> >> > >
> >> > > But, for a single log, we don't really care about ordering aside
> from
> >> at
> >> > > the per-key level. So all we really need to be able to handle is
> >> > preventing
> >> > > duplicates when a failure occurs, and ordering consistency across
> >> > requests
> >> > > from a single client.
> >> > >
> >> > > So our general requirements are:
> >> > > Write A, Write B
> >> > > Timeline: A -> B
> >> > > Request B is only made after A has successfully returned (possibly
> >> after
> >> > > retries)
> >> > >
> >> > > 1) If the write succeeds, it will be durably exposed to clients
> within
> >> > some
> >> > > bounded time frame
> >> > >
> >> >
> >> > Guaranteed.
> >> >
> >>
> >> > > 2) If A succeeds and B succeeds, the ordering for the log will be A
> >> and
> >> > > then B
> >> > >
> >> >
> >> > If I understand correctly here, B is only sent after A is returned,
> >> right?
> >> > If that's the case, It is guaranteed.
> >>
> >>
> >>
> >> >
> >> >
> >> >
> >> > > 3) If A fails due to an error that can be relied on to *not* be a
> lost
> >> > ack
> >> > > problem, it will never be exposed to the client, so it may
> (depending
> >> on
> >> > > the error) be retried immediately
> >> > >
> >> >
> >> > If it is not a lost-ack problem, the entry will be exposed. it is
> >> > guaranteed.
> >>
> >> Let me try rephrasing the questions, to make sure I'm understanding
> >> correctly:
> >> If A fails, with an error such as "Unable to create connection to
> >> bookkeeper server", that would be the type of error we would expect to
> be
> >> able to retry immediately, as that result means no action was taken on
> any
> >> log / etc, so no entry could have been created. This is different then a
> >> "Connection Timeout" exception, as we just might not have gotten a
> >> response
> >> in time.
> >>
> >>
> > Gotcha.
> >
> > The response code returned from proxy can tell if a failure can be
> retried
> > safely or not. (We might need to make them well documented)
> >
> >
> >
> >>
> >> >
> >> >
> >> > > 4) If A fails due to an error that could be a lost ack problem
> >> (network
> >> > > connectivity / etc), within a bounded time frame it should be
> >> possible to
> >> > > find out if the write succeed or failed. Either by reading from some
> >> > > checkpoint of the log for the changes that should have been made or
> >> some
> >> > > other possible server-side support.
> >> > >
> >> >
> >> > If I understand this correctly, it is a duplication issue, right?
> >> >
> >> > Can a de-duplication solution work here? Either DL or your client does
> >> the
> >> > de-duplication?
> >> >
> >>
> >> The requirements I'm mentioning are the ones needed for client-side
> >> dedupping. Since if I can guarantee writes being exposed within some
> time
> >> frame, and I can never get into an inconsistently ordered state when
> >> successes happen, when an error occurs, I can always wait for max time
> >> frame, read the latest writes, and then dedup locally against the
> request
> >> I
> >> just made.
> >>
> >> The main thing about that timeframe is that its basically the addition
> of
> >> every timeout, all the way down in the system, combined with whatever
> >> flushing / caching / etc times are at the bookkeeper / client level for
> >> when values are exposed
> >
> >
> > Gotcha.
> >
> >>
> >>
> >> >
> >> > Is there any ways to identify your write?
> >> >
> >> > I can think of a case as follow - I want to know what is your expected
> >> > behavior from the log.
> >> >
> >> > a)
> >> >
> >> > If a hbase region server A writes a change of key K to the log, the
> >> change
> >> > is successfully made to the log;
> >> > but server A is down before receiving the change.
> >> > region server B took over the region that contains K, what will B do?
> >> >
> >>
> >> HBase writes in large chunks (WAL Logs), which its replication system
> then
> >> handles by replaying in the case of failure. If I'm in a middle of a
> log,
> >> and the whole region goes down and gets rescheduled elsewhere, I will
> >> start
> >> back up from the beginning of the log I was in the middle of. Using
> >> checkpointing + deduping, we should be able to find out where we left
> off
> >> in the log.
> >
> >
> >> >
> >> >
> >> > b) same as a). but server A was just network partitioned. will both A
> >> and B
> >> > write the change of key K?
> >> >
> >>
> >> HBase gives us some guarantees around network partitions (Consistency
> over
> >> availability for HBase). HBase is a single-master failover recovery type
> >> of
> >> system, with zookeeper-based guarantees for single owners (writers) of a
> >> range of data.
> >>
> >> >
> >> >
> >> >
> >> > > 5) If A is turned into multiple batches (one large request gets
> split
> >> > into
> >> > > multiple smaller ones to the bookkeeper backend, due to log rolling
> /
> >> > size
> >> > > / etc):
> >> > >   a) The ordering of entries within batches have ordering
> consistence
> >> > with
> >> > > the original request, when exposed in the log (though they may be
> >> > > interleaved with other requests)
> >> > >   b) The ordering across batches have ordering consistence with the
> >> > > original request, when exposed in the log (though they may be
> >> interleaved
> >> > > with other requests)
> >> > >   c) If a batch fails, and cannot be retried / is unsuccessfully
> >> retried,
> >> > > all batches after the failed batch should not be exposed in the log.
> >> > Note:
> >> > > The batches before and including the failed batch, that ended up
> >> > > succeeding, can show up in the log, again within some bounded time
> >> range
> >> > > for reads by a client.
> >> > >
> >> >
> >> > There is a method 'writeBulk' in DistributedLogClient can achieve this
> >> > guarantee.
> >> >
> >> > However, I am not very sure about how will you turn A into batches. If
> >> you
> >> > are dividing A into batches,
> >> > you can simply control the application write sequence to achieve the
> >> > guarantee here.
> >> >
> >> > Can you explain more about this?
> >> >
> >>
> >> In this case, by batches I mean what the proxy does with the single
> >> request
> >> that I send it. If the proxy decides it needs to turn my single request
> >> into multiple batches of requests, due to log rolling, size limitations,
> >> etc, those would be the guarantees I need to be able to reduplicate on
> the
> >> client side.
> >
> >
> > A single record written by #write and A record set (set of records)
> > written by #writeRecordSet are atomic - they will not be broken down into
> > entries (batches). With the correct response code, you would be able to
> > tell if it is a lost-ack failure or not. However there is a size
> limitation
> > for this - it can't not go beyond 1MB for current implementation.
> >
> > What is your expected record size?
> >
> >
> >>
> >> >
> >> >
> >> > >
> >> > > Since we can guarantee per-key ordering on the client side, we
> >> guarantee
> >> > > that there is a single writer per-key, just not per log.
> >> >
> >> >
> >> > Do you need fencing guarantee in the case of network partition causing
> >> > two-writers?
> >> >
> >> >
> >> > > So if there was a
> >> > > way to guarantee a single write request as being written or not,
> >> within a
> >> > > certain time frame (since failures should be rare anyways, this is
> >> fine
> >> > if
> >> > > it is expensive), we can then have the client guarantee the ordering
> >> it
> >> > > needs.
> >> > >
> >> >
> >> > This sounds an 'exact-once' write (regarding retries) requirement to
> me,
> >> > right?
> >> >
> >> Yes. I'm curious of how this issue is handled by Manhattan, since you
> can
> >> imagine a data store that ends up getting multiple writes for the same
> put
> >> / get / etc, would be harder to use, and we are basically trying to
> create
> >> a log like that for HBase.
> >
> >
> > Are you guys replacing HBase WAL?
> >
> > In Manhattan case, the request will be first written to DL streams by
> > Manhattan coordinator. The Manhattan replica then will read from the DL
> > streams and apply the change. In the lost-ack case, the MH coordinator
> will
> > just fail the request to client.
> >
> > My feeling here is your usage for HBase is a bit different from how we
> use
> > DL in Manhattan. It sounds like you read from a source (HBase WAL) and
> > write to DL. But I might be wrong.
> >
> >
> >>
> >> >
> >> >
> >> > >
> >> > >
> >> > > > Cameron:
> >> > > > Another thing we've discussed but haven't really thought through -
> >> > > > We might be able to support some kind of epoch write request,
> where
> >> the
> >> > > > epoch is guaranteed to have changed if the writer has changed or
> the
> >> > > ledger
> >> > > > was ever fenced off. Writes include an epoch and are rejected if
> the
> >> > > epoch
> >> > > > has changed.
> >> > > > With a mechanism like this, fencing the ledger off after a failure
> >> > would
> >> > > > ensure any pending writes had either been written or would be
> >> rejected.
> >> > >
> >> > > The issue would be how I guarantee the write I wrote to the server
> was
> >> > > written. Since a network issue could happen on the send of the
> >> request,
> >> > or
> >> > > on the receive of the success response, an epoch wouldn't tell me
> if I
> >> > can
> >> > > successfully retry, as it could be successfully written but AWS
> >> dropped
> >> > the
> >> > > connection for the success response. Since the epoch would be the
> same
> >> > > (same ledger), I could write duplicates.
> >> > >
> >> > >
> >> > > > We are currently proposing adding a transaction semantic to dl to
> >> get
> >> > rid
> >> > > > of the size limitation and the unaware-ness in the proxy client.
> >> Here
> >> > is
> >> > > > our idea -
> >> > > > http://mail-archives.apache.org/mod_mbox/incubator-distributedlog
> >> > > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm
> >> > > <http://mail-archives.apache.org/mod_mbox/incubator-
> >> > distributedlog%0A-dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh
> 42X=xuYwYm>
> >> > > l4nxsybyiofzxpv...@mail.gmail.com%3e
> >> > >
> >> > > > I am not sure if your idea is similar as ours. but we'd like to
> >> > > collaborate
> >> > > > with the community if anyone has the similar idea.
> >> > >
> >> > > Our use case would be covered by transaction support, but I'm unsure
> >> if
> >> > we
> >> > > would need something that heavy weight for the guarantees we need.
> >> > >
> >> >
> >> > >
> >> > > Basically, the high level requirement here is "Support consistent
> >> write
> >> > > ordering for single-writer-per-key, multi-writer-per-log". My hunch
> is
> >> > > that, with some added guarantees to the proxy (if it isn't already
> >> > > supported), and some custom client code on our side for removing the
> >> > > entries that actually succeed to write to DistributedLog from the
> >> request
> >> > > that failed, it should be a relatively easy thing to support.
> >> > >
> >> >
> >> > Yup. I think it should not be very difficult to support. There might
> be
> >> > some changes in the server side.
> >> > Let's figure out what will the changes be. Are you guys interested in
> >> > contributing?
> >> >
> >> > Yes, we would be.
> >>
> >> As a note, the one thing that we see as an issue with the client side
> >> dedupping is how to bound the range of data that needs to be looked at
> for
> >> deduplication. As you can imagine, it is pretty easy to bound the bottom
> >> of
> >> the range, as that it just regular checkpointing of the DSLN that is
> >> returned. I'm still not sure if there is any nice way to time bound the
> >> top
> >> end of the range, especially since the proxy owns sequence numbers
> (which
> >> makes sense). I am curious if there is more that can be done if
> >> deduplication is on the server side. However the main minus I see of
> >> server
> >> side deduplication is that instead of running contingent on there being
> a
> >> failed client request, instead it would have to run every time a write
> >> happens.
> >
> >
> > For a reliable dedup, we probably need fence-then-getLastDLSN operation -
> > so it would guarantee that any non-completed requests issued (lost-ack
> > requests) before this fence-then-getLastDLSN operation will be failed and
> > they will never land at the log.
> >
> > the pseudo code would look like below -
> >
> > write(request) onFailure { t =>
> >
> > if (t is timeout exception) {
> >
> > DLSN lastDLSN = fenceThenGetLastDLSN()
> > DLSN lastCheckpointedDLSN = ...;
> > // find if the request lands between [lastDLSN, lastCheckpointedDLSN].
> > // if it exists, the write succeed; otherwise retry.
> >
> > }
> >
> >
> > }
> >
>
>
> Just realized the idea is same as what Leigh raised in the previous email
> about 'epoch write'. Let me explain more about this idea (Leigh, feel free
> to jump in to fill up your idea).
>
> - when a log stream is owned,  the proxy use the last transaction id as the
> epoch
> - when a client connects (handshake with the proxy), it will get the epoch
> for the stream.
> - the writes issued by this client will carry the epoch to the proxy.
> - add a new rpc - fenceThenGetLastDLSN - it would force the proxy to bump
> the epoch.
> - if fenceThenGetLastDLSN happened, all the outstanding writes with old
> epoch will be rejected with exceptions (e.g. EpochFenced).
> - The DLSN returned from fenceThenGetLastDLSN can be used as the bound for
> deduplications on failures.
>
> Cameron, does this sound a solution to your use case?
>
>
>
> >
> >
> >>
> >> Maybe something that could fit a similar need that Kafka does (the last
> >> store value for a particular key in a log), such that on a per key basis
> >> there could be a sequence number that support deduplication? Cost seems
> >> like it would be high however, and I'm not even sure if bookkeeper
> >> supports
> >> it.
> >
> >
> >> Cheers,
> >> Cameron
> >>
> >> >
> >> > >
> >> > > Thanks,
> >> > > Cameron
> >> > >
> >> > >
> >> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
> >> > <lstew...@twitter.com.invalid
> >> > > >
> >> > > wrote:
> >> > >
> >> > > > Cameron:
> >> > > > Another thing we've discussed but haven't really thought through -
> >> > > > We might be able to support some kind of epoch write request,
> where
> >> the
> >> > > > epoch is guaranteed to have changed if the writer has changed or
> the
> >> > > ledger
> >> > > > was ever fenced off. Writes include an epoch and are rejected if
> the
> >> > > epoch
> >> > > > has changed.
> >> > > > With a mechanism like this, fencing the ledger off after a failure
> >> > would
> >> > > > ensure any pending writes had either been written or would be
> >> rejected.
> >> > > >
> >> > > >
> >> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <si...@apache.org>
> wrote:
> >> > > >
> >> > > > > Cameron,
> >> > > > >
> >> > > > > I think both Leigh and Xi had made a few good points about your
> >> > > question.
> >> > > > >
> >> > > > > To add one more point to your question - "but I am not
> >> > > > > 100% of how all of the futures in the code handle failures.
> >> > > > > If not, where in the code would be the relevant places to add
> the
> >> > > ability
> >> > > > > to do this, and would the project be interested in a pull
> >> request?"
> >> > > > >
> >> > > > > The current proxy and client logic doesn't do perfectly on
> >> handling
> >> > > > > failures (duplicates) - the strategy now is the client will
> retry
> >> as
> >> > > best
> >> > > > > at it can before throwing exceptions to users. The code you are
> >> > looking
> >> > > > for
> >> > > > > - it is on BKLogSegmentWriter for the proxy handling writes and
> >> it is
> >> > > on
> >> > > > > DistributedLogClientImpl for the proxy client handling responses
> >> from
> >> > > > > proxies. Does this help you?
> >> > > > >
> >> > > > > And also, you are welcome to contribute the pull requests.
> >> > > > >
> >> > > > > - Sijie
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield <
> >> kin...@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > I have a question about the Proxy Client. Basically, for our
> use
> >> > > cases,
> >> > > > > we
> >> > > > > > want to guarantee ordering at the key level, irrespective of
> the
> >> > > > ordering
> >> > > > > > of the partition it may be assigned to as a whole. Due to the
> >> > source
> >> > > of
> >> > > > > the
> >> > > > > > data (HBase Replication), we cannot guarantee that a single
> >> > partition
> >> > > > > will
> >> > > > > > be owned for writes by the same client. This means the proxy
> >> client
> >> > > > works
> >> > > > > > well (since we don't care which proxy owns the partition we
> are
> >> > > writing
> >> > > > > > to).
> >> > > > > >
> >> > > > > >
> >> > > > > > However, the guarantees we need when writing a batch consists
> >> of:
> >> > > > > > Definition of a Batch: The set of records sent to the
> writeBatch
> >> > > > endpoint
> >> > > > > > on the proxy
> >> > > > > >
> >> > > > > > 1. Batch success: If the client receives a success from the
> >> proxy,
> >> > > then
> >> > > > > > that batch is successfully written
> >> > > > > >
> >> > > > > > 2. Inter-Batch ordering : Once a batch has been written
> >> > successfully
> >> > > by
> >> > > > > the
> >> > > > > > client, when another batch is written, it will be guaranteed
> to
> >> be
> >> > > > > ordered
> >> > > > > > after the last batch (if it is the same stream).
> >> > > > > >
> >> > > > > > 3. Intra-Batch ordering: Within a batch of writes, the records
> >> will
> >> > > be
> >> > > > > > committed in order
> >> > > > > >
> >> > > > > > 4. Intra-Batch failure ordering: If an individual record fails
> >> to
> >> > > write
> >> > > > > > within a batch, all records after that record will not be
> >> written.
> >> > > > > >
> >> > > > > > 5. Batch Commit: Guarantee that if a batch returns a success,
> it
> >> > will
> >> > > > be
> >> > > > > > written
> >> > > > > >
> >> > > > > > 6. Read-after-write: Once a batch is committed, within a
> limited
> >> > > > > time-frame
> >> > > > > > it will be able to be read. This is required in the case of
> >> > failure,
> >> > > so
> >> > > > > > that the client can see what actually got committed. I believe
> >> the
> >> > > > > > time-frame part could be removed if the client can send in the
> >> same
> >> > > > > > sequence number that was written previously, since it would
> then
> >> > fail
> >> > > > and
> >> > > > > > we would know that a read needs to occur.
> >> > > > > >
> >> > > > > >
> >> > > > > > So, my basic question is if this is currently possible in the
> >> > proxy?
> >> > > I
> >> > > > > > don't believe it gives these guarantees as it stands today,
> but
> >> I
> >> > am
> >> > > > not
> >> > > > > > 100% of how all of the futures in the code handle failures.
> >> > > > > > If not, where in the code would be the relevant places to add
> >> the
> >> > > > ability
> >> > > > > > to do this, and would the project be interested in a pull
> >> request?
> >> > > > > >
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Cameron
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to