On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo <sij...@twitter.com> wrote:

>
>
> On Monday, October 17, 2016, Cameron Hatfield <kin...@gmail.com> wrote:
>
>> Answer inline:
>>
>> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <si...@apache.org> wrote:
>>
>> > Cameron,
>> >
>> > Thank you for your summary. I liked the discussion here. I also liked
>> the
>> > summary of your requirement - 'single-writer-per-key,
>> > multiple-writers-per-log'. If I understand correctly, the core concern
>> here
>> > is almost 'exact-once' write (or a way to explicit tell if a write can
>> be
>> > retried or not).
>> >
>> > Comments inline.
>> >
>> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <kin...@gmail.com>
>> > wrote:
>> >
>> > > > Ah- yes good point (to be clear we're not using the proxy this way
>> > > today).
>> > >
>> > > > > Due to the source of the
>> > > > > data (HBase Replication), we cannot guarantee that a single
>> partition
>> > > will
>> > > > > be owned for writes by the same client.
>> > >
>> > > > Do you mean you *need* to support multiple writers issuing
>> interleaved
>> > > > writes or is it just that they might sometimes interleave writes and
>> > you
>> > > >don't care?
>> > > How HBase partitions the keys being written wouldn't have a one->one
>> > > mapping with the partitions we would have in HBase. Even if we did
>> have
>> > > that alignment when the cluster first started, HBase will rebalance
>> what
>> > > servers own what partitions, as well as split and merge partitions
>> that
>> > > already exist, causing eventual drift from one log per partition.
>> > > Because we want ordering guarantees per key (row in hbase), we
>> partition
>> > > the logs by the key. Since multiple writers are possible per range of
>> > keys
>> > > (due to the aforementioned rebalancing / splitting / etc of hbase), we
>> > > cannot use the core library due to requiring a single writer for
>> > ordering.
>> > >
>> > > But, for a single log, we don't really care about ordering aside from
>> at
>> > > the per-key level. So all we really need to be able to handle is
>> > preventing
>> > > duplicates when a failure occurs, and ordering consistency across
>> > requests
>> > > from a single client.
>> > >
>> > > So our general requirements are:
>> > > Write A, Write B
>> > > Timeline: A -> B
>> > > Request B is only made after A has successfully returned (possibly
>> after
>> > > retries)
>> > >
>> > > 1) If the write succeeds, it will be durably exposed to clients within
>> > some
>> > > bounded time frame
>> > >
>> >
>> > Guaranteed.
>> >
>>
>> > > 2) If A succeeds and B succeeds, the ordering for the log will be A
>> and
>> > > then B
>> > >
>> >
>> > If I understand correctly here, B is only sent after A is returned,
>> right?
>> > If that's the case, It is guaranteed.
>>
>>
>>
>> >
>> >
>> >
>> > > 3) If A fails due to an error that can be relied on to *not* be a lost
>> > ack
>> > > problem, it will never be exposed to the client, so it may (depending
>> on
>> > > the error) be retried immediately
>> > >
>> >
>> > If it is not a lost-ack problem, the entry will be exposed. it is
>> > guaranteed.
>>
>> Let me try rephrasing the questions, to make sure I'm understanding
>> correctly:
>> If A fails, with an error such as "Unable to create connection to
>> bookkeeper server", that would be the type of error we would expect to be
>> able to retry immediately, as that result means no action was taken on any
>> log / etc, so no entry could have been created. This is different then a
>> "Connection Timeout" exception, as we just might not have gotten a
>> response
>> in time.
>>
>>
> Gotcha.
>
> The response code returned from proxy can tell if a failure can be retried
> safely or not. (We might need to make them well documented)
>
>
>
>>
>> >
>> >
>> > > 4) If A fails due to an error that could be a lost ack problem
>> (network
>> > > connectivity / etc), within a bounded time frame it should be
>> possible to
>> > > find out if the write succeed or failed. Either by reading from some
>> > > checkpoint of the log for the changes that should have been made or
>> some
>> > > other possible server-side support.
>> > >
>> >
>> > If I understand this correctly, it is a duplication issue, right?
>> >
>> > Can a de-duplication solution work here? Either DL or your client does
>> the
>> > de-duplication?
>> >
>>
>> The requirements I'm mentioning are the ones needed for client-side
>> dedupping. Since if I can guarantee writes being exposed within some time
>> frame, and I can never get into an inconsistently ordered state when
>> successes happen, when an error occurs, I can always wait for max time
>> frame, read the latest writes, and then dedup locally against the request
>> I
>> just made.
>>
>> The main thing about that timeframe is that its basically the addition of
>> every timeout, all the way down in the system, combined with whatever
>> flushing / caching / etc times are at the bookkeeper / client level for
>> when values are exposed
>
>
> Gotcha.
>
>>
>>
>> >
>> > Is there any ways to identify your write?
>> >
>> > I can think of a case as follow - I want to know what is your expected
>> > behavior from the log.
>> >
>> > a)
>> >
>> > If a hbase region server A writes a change of key K to the log, the
>> change
>> > is successfully made to the log;
>> > but server A is down before receiving the change.
>> > region server B took over the region that contains K, what will B do?
>> >
>>
>> HBase writes in large chunks (WAL Logs), which its replication system then
>> handles by replaying in the case of failure. If I'm in a middle of a log,
>> and the whole region goes down and gets rescheduled elsewhere, I will
>> start
>> back up from the beginning of the log I was in the middle of. Using
>> checkpointing + deduping, we should be able to find out where we left off
>> in the log.
>
>
>> >
>> >
>> > b) same as a). but server A was just network partitioned. will both A
>> and B
>> > write the change of key K?
>> >
>>
>> HBase gives us some guarantees around network partitions (Consistency over
>> availability for HBase). HBase is a single-master failover recovery type
>> of
>> system, with zookeeper-based guarantees for single owners (writers) of a
>> range of data.
>>
>> >
>> >
>> >
>> > > 5) If A is turned into multiple batches (one large request gets split
>> > into
>> > > multiple smaller ones to the bookkeeper backend, due to log rolling /
>> > size
>> > > / etc):
>> > >   a) The ordering of entries within batches have ordering consistence
>> > with
>> > > the original request, when exposed in the log (though they may be
>> > > interleaved with other requests)
>> > >   b) The ordering across batches have ordering consistence with the
>> > > original request, when exposed in the log (though they may be
>> interleaved
>> > > with other requests)
>> > >   c) If a batch fails, and cannot be retried / is unsuccessfully
>> retried,
>> > > all batches after the failed batch should not be exposed in the log.
>> > Note:
>> > > The batches before and including the failed batch, that ended up
>> > > succeeding, can show up in the log, again within some bounded time
>> range
>> > > for reads by a client.
>> > >
>> >
>> > There is a method 'writeBulk' in DistributedLogClient can achieve this
>> > guarantee.
>> >
>> > However, I am not very sure about how will you turn A into batches. If
>> you
>> > are dividing A into batches,
>> > you can simply control the application write sequence to achieve the
>> > guarantee here.
>> >
>> > Can you explain more about this?
>> >
>>
>> In this case, by batches I mean what the proxy does with the single
>> request
>> that I send it. If the proxy decides it needs to turn my single request
>> into multiple batches of requests, due to log rolling, size limitations,
>> etc, those would be the guarantees I need to be able to reduplicate on the
>> client side.
>
>
> A single record written by #write and A record set (set of records)
> written by #writeRecordSet are atomic - they will not be broken down into
> entries (batches). With the correct response code, you would be able to
> tell if it is a lost-ack failure or not. However there is a size limitation
> for this - it can't not go beyond 1MB for current implementation.
>
> What is your expected record size?
>
>
>>
>> >
>> >
>> > >
>> > > Since we can guarantee per-key ordering on the client side, we
>> guarantee
>> > > that there is a single writer per-key, just not per log.
>> >
>> >
>> > Do you need fencing guarantee in the case of network partition causing
>> > two-writers?
>> >
>> >
>> > > So if there was a
>> > > way to guarantee a single write request as being written or not,
>> within a
>> > > certain time frame (since failures should be rare anyways, this is
>> fine
>> > if
>> > > it is expensive), we can then have the client guarantee the ordering
>> it
>> > > needs.
>> > >
>> >
>> > This sounds an 'exact-once' write (regarding retries) requirement to me,
>> > right?
>> >
>> Yes. I'm curious of how this issue is handled by Manhattan, since you can
>> imagine a data store that ends up getting multiple writes for the same put
>> / get / etc, would be harder to use, and we are basically trying to create
>> a log like that for HBase.
>
>
> Are you guys replacing HBase WAL?
>
> In Manhattan case, the request will be first written to DL streams by
> Manhattan coordinator. The Manhattan replica then will read from the DL
> streams and apply the change. In the lost-ack case, the MH coordinator will
> just fail the request to client.
>
> My feeling here is your usage for HBase is a bit different from how we use
> DL in Manhattan. It sounds like you read from a source (HBase WAL) and
> write to DL. But I might be wrong.
>
>
>>
>> >
>> >
>> > >
>> > >
>> > > > Cameron:
>> > > > Another thing we've discussed but haven't really thought through -
>> > > > We might be able to support some kind of epoch write request, where
>> the
>> > > > epoch is guaranteed to have changed if the writer has changed or the
>> > > ledger
>> > > > was ever fenced off. Writes include an epoch and are rejected if the
>> > > epoch
>> > > > has changed.
>> > > > With a mechanism like this, fencing the ledger off after a failure
>> > would
>> > > > ensure any pending writes had either been written or would be
>> rejected.
>> > >
>> > > The issue would be how I guarantee the write I wrote to the server was
>> > > written. Since a network issue could happen on the send of the
>> request,
>> > or
>> > > on the receive of the success response, an epoch wouldn't tell me if I
>> > can
>> > > successfully retry, as it could be successfully written but AWS
>> dropped
>> > the
>> > > connection for the success response. Since the epoch would be the same
>> > > (same ledger), I could write duplicates.
>> > >
>> > >
>> > > > We are currently proposing adding a transaction semantic to dl to
>> get
>> > rid
>> > > > of the size limitation and the unaware-ness in the proxy client.
>> Here
>> > is
>> > > > our idea -
>> > > > http://mail-archives.apache.org/mod_mbox/incubator-distributedlog
>> > > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm
>> > > <http://mail-archives.apache.org/mod_mbox/incubator-
>> > distributedlog%0A-dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm>
>> > > l4nxsybyiofzxpv...@mail.gmail.com%3e
>> > >
>> > > > I am not sure if your idea is similar as ours. but we'd like to
>> > > collaborate
>> > > > with the community if anyone has the similar idea.
>> > >
>> > > Our use case would be covered by transaction support, but I'm unsure
>> if
>> > we
>> > > would need something that heavy weight for the guarantees we need.
>> > >
>> >
>> > >
>> > > Basically, the high level requirement here is "Support consistent
>> write
>> > > ordering for single-writer-per-key, multi-writer-per-log". My hunch is
>> > > that, with some added guarantees to the proxy (if it isn't already
>> > > supported), and some custom client code on our side for removing the
>> > > entries that actually succeed to write to DistributedLog from the
>> request
>> > > that failed, it should be a relatively easy thing to support.
>> > >
>> >
>> > Yup. I think it should not be very difficult to support. There might be
>> > some changes in the server side.
>> > Let's figure out what will the changes be. Are you guys interested in
>> > contributing?
>> >
>> > Yes, we would be.
>>
>> As a note, the one thing that we see as an issue with the client side
>> dedupping is how to bound the range of data that needs to be looked at for
>> deduplication. As you can imagine, it is pretty easy to bound the bottom
>> of
>> the range, as that it just regular checkpointing of the DSLN that is
>> returned. I'm still not sure if there is any nice way to time bound the
>> top
>> end of the range, especially since the proxy owns sequence numbers (which
>> makes sense). I am curious if there is more that can be done if
>> deduplication is on the server side. However the main minus I see of
>> server
>> side deduplication is that instead of running contingent on there being a
>> failed client request, instead it would have to run every time a write
>> happens.
>
>
> For a reliable dedup, we probably need fence-then-getLastDLSN operation -
> so it would guarantee that any non-completed requests issued (lost-ack
> requests) before this fence-then-getLastDLSN operation will be failed and
> they will never land at the log.
>
> the pseudo code would look like below -
>
> write(request) onFailure { t =>
>
> if (t is timeout exception) {
>
> DLSN lastDLSN = fenceThenGetLastDLSN()
> DLSN lastCheckpointedDLSN = ...;
> // find if the request lands between [lastDLSN, lastCheckpointedDLSN].
> // if it exists, the write succeed; otherwise retry.
>
> }
>
>
> }
>


Just realized the idea is same as what Leigh raised in the previous email
about 'epoch write'. Let me explain more about this idea (Leigh, feel free
to jump in to fill up your idea).

- when a log stream is owned,  the proxy use the last transaction id as the
epoch
- when a client connects (handshake with the proxy), it will get the epoch
for the stream.
- the writes issued by this client will carry the epoch to the proxy.
- add a new rpc - fenceThenGetLastDLSN - it would force the proxy to bump
the epoch.
- if fenceThenGetLastDLSN happened, all the outstanding writes with old
epoch will be rejected with exceptions (e.g. EpochFenced).
- The DLSN returned from fenceThenGetLastDLSN can be used as the bound for
deduplications on failures.

Cameron, does this sound a solution to your use case?



>
>
>>
>> Maybe something that could fit a similar need that Kafka does (the last
>> store value for a particular key in a log), such that on a per key basis
>> there could be a sequence number that support deduplication? Cost seems
>> like it would be high however, and I'm not even sure if bookkeeper
>> supports
>> it.
>
>
>> Cheers,
>> Cameron
>>
>> >
>> > >
>> > > Thanks,
>> > > Cameron
>> > >
>> > >
>> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
>> > <lstew...@twitter.com.invalid
>> > > >
>> > > wrote:
>> > >
>> > > > Cameron:
>> > > > Another thing we've discussed but haven't really thought through -
>> > > > We might be able to support some kind of epoch write request, where
>> the
>> > > > epoch is guaranteed to have changed if the writer has changed or the
>> > > ledger
>> > > > was ever fenced off. Writes include an epoch and are rejected if the
>> > > epoch
>> > > > has changed.
>> > > > With a mechanism like this, fencing the ledger off after a failure
>> > would
>> > > > ensure any pending writes had either been written or would be
>> rejected.
>> > > >
>> > > >
>> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <si...@apache.org> wrote:
>> > > >
>> > > > > Cameron,
>> > > > >
>> > > > > I think both Leigh and Xi had made a few good points about your
>> > > question.
>> > > > >
>> > > > > To add one more point to your question - "but I am not
>> > > > > 100% of how all of the futures in the code handle failures.
>> > > > > If not, where in the code would be the relevant places to add the
>> > > ability
>> > > > > to do this, and would the project be interested in a pull
>> request?"
>> > > > >
>> > > > > The current proxy and client logic doesn't do perfectly on
>> handling
>> > > > > failures (duplicates) - the strategy now is the client will retry
>> as
>> > > best
>> > > > > at it can before throwing exceptions to users. The code you are
>> > looking
>> > > > for
>> > > > > - it is on BKLogSegmentWriter for the proxy handling writes and
>> it is
>> > > on
>> > > > > DistributedLogClientImpl for the proxy client handling responses
>> from
>> > > > > proxies. Does this help you?
>> > > > >
>> > > > > And also, you are welcome to contribute the pull requests.
>> > > > >
>> > > > > - Sijie
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield <
>> kin...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > I have a question about the Proxy Client. Basically, for our use
>> > > cases,
>> > > > > we
>> > > > > > want to guarantee ordering at the key level, irrespective of the
>> > > > ordering
>> > > > > > of the partition it may be assigned to as a whole. Due to the
>> > source
>> > > of
>> > > > > the
>> > > > > > data (HBase Replication), we cannot guarantee that a single
>> > partition
>> > > > > will
>> > > > > > be owned for writes by the same client. This means the proxy
>> client
>> > > > works
>> > > > > > well (since we don't care which proxy owns the partition we are
>> > > writing
>> > > > > > to).
>> > > > > >
>> > > > > >
>> > > > > > However, the guarantees we need when writing a batch consists
>> of:
>> > > > > > Definition of a Batch: The set of records sent to the writeBatch
>> > > > endpoint
>> > > > > > on the proxy
>> > > > > >
>> > > > > > 1. Batch success: If the client receives a success from the
>> proxy,
>> > > then
>> > > > > > that batch is successfully written
>> > > > > >
>> > > > > > 2. Inter-Batch ordering : Once a batch has been written
>> > successfully
>> > > by
>> > > > > the
>> > > > > > client, when another batch is written, it will be guaranteed to
>> be
>> > > > > ordered
>> > > > > > after the last batch (if it is the same stream).
>> > > > > >
>> > > > > > 3. Intra-Batch ordering: Within a batch of writes, the records
>> will
>> > > be
>> > > > > > committed in order
>> > > > > >
>> > > > > > 4. Intra-Batch failure ordering: If an individual record fails
>> to
>> > > write
>> > > > > > within a batch, all records after that record will not be
>> written.
>> > > > > >
>> > > > > > 5. Batch Commit: Guarantee that if a batch returns a success, it
>> > will
>> > > > be
>> > > > > > written
>> > > > > >
>> > > > > > 6. Read-after-write: Once a batch is committed, within a limited
>> > > > > time-frame
>> > > > > > it will be able to be read. This is required in the case of
>> > failure,
>> > > so
>> > > > > > that the client can see what actually got committed. I believe
>> the
>> > > > > > time-frame part could be removed if the client can send in the
>> same
>> > > > > > sequence number that was written previously, since it would then
>> > fail
>> > > > and
>> > > > > > we would know that a read needs to occur.
>> > > > > >
>> > > > > >
>> > > > > > So, my basic question is if this is currently possible in the
>> > proxy?
>> > > I
>> > > > > > don't believe it gives these guarantees as it stands today, but
>> I
>> > am
>> > > > not
>> > > > > > 100% of how all of the futures in the code handle failures.
>> > > > > > If not, where in the code would be the relevant places to add
>> the
>> > > > ability
>> > > > > > to do this, and would the project be interested in a pull
>> request?
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Cameron
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to