Yes, we are reading the HBase WAL (from their replication plugin support), and writing that into DL.
>From the sounds of it, yes, it would. Only thing I would say is make the epoch requirement optional, so that if I client doesn't care about dupes they don't have to deal with the process of getting a new epoch. -Cameron On Wed, Oct 19, 2016 at 7:43 PM, Sijie Guo <sij...@twitter.com.invalid> wrote: > On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo <sij...@twitter.com> wrote: > > > > > > > On Monday, October 17, 2016, Cameron Hatfield <kin...@gmail.com> wrote: > > > >> Answer inline: > >> > >> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <si...@apache.org> wrote: > >> > >> > Cameron, > >> > > >> > Thank you for your summary. I liked the discussion here. I also liked > >> the > >> > summary of your requirement - 'single-writer-per-key, > >> > multiple-writers-per-log'. If I understand correctly, the core concern > >> here > >> > is almost 'exact-once' write (or a way to explicit tell if a write can > >> be > >> > retried or not). > >> > > >> > Comments inline. > >> > > >> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <kin...@gmail.com> > >> > wrote: > >> > > >> > > > Ah- yes good point (to be clear we're not using the proxy this way > >> > > today). > >> > > > >> > > > > Due to the source of the > >> > > > > data (HBase Replication), we cannot guarantee that a single > >> partition > >> > > will > >> > > > > be owned for writes by the same client. > >> > > > >> > > > Do you mean you *need* to support multiple writers issuing > >> interleaved > >> > > > writes or is it just that they might sometimes interleave writes > and > >> > you > >> > > >don't care? > >> > > How HBase partitions the keys being written wouldn't have a one->one > >> > > mapping with the partitions we would have in HBase. Even if we did > >> have > >> > > that alignment when the cluster first started, HBase will rebalance > >> what > >> > > servers own what partitions, as well as split and merge partitions > >> that > >> > > already exist, causing eventual drift from one log per partition. > >> > > Because we want ordering guarantees per key (row in hbase), we > >> partition > >> > > the logs by the key. Since multiple writers are possible per range > of > >> > keys > >> > > (due to the aforementioned rebalancing / splitting / etc of hbase), > we > >> > > cannot use the core library due to requiring a single writer for > >> > ordering. > >> > > > >> > > But, for a single log, we don't really care about ordering aside > from > >> at > >> > > the per-key level. So all we really need to be able to handle is > >> > preventing > >> > > duplicates when a failure occurs, and ordering consistency across > >> > requests > >> > > from a single client. > >> > > > >> > > So our general requirements are: > >> > > Write A, Write B > >> > > Timeline: A -> B > >> > > Request B is only made after A has successfully returned (possibly > >> after > >> > > retries) > >> > > > >> > > 1) If the write succeeds, it will be durably exposed to clients > within > >> > some > >> > > bounded time frame > >> > > > >> > > >> > Guaranteed. > >> > > >> > >> > > 2) If A succeeds and B succeeds, the ordering for the log will be A > >> and > >> > > then B > >> > > > >> > > >> > If I understand correctly here, B is only sent after A is returned, > >> right? > >> > If that's the case, It is guaranteed. > >> > >> > >> > >> > > >> > > >> > > >> > > 3) If A fails due to an error that can be relied on to *not* be a > lost > >> > ack > >> > > problem, it will never be exposed to the client, so it may > (depending > >> on > >> > > the error) be retried immediately > >> > > > >> > > >> > If it is not a lost-ack problem, the entry will be exposed. it is > >> > guaranteed. > >> > >> Let me try rephrasing the questions, to make sure I'm understanding > >> correctly: > >> If A fails, with an error such as "Unable to create connection to > >> bookkeeper server", that would be the type of error we would expect to > be > >> able to retry immediately, as that result means no action was taken on > any > >> log / etc, so no entry could have been created. This is different then a > >> "Connection Timeout" exception, as we just might not have gotten a > >> response > >> in time. > >> > >> > > Gotcha. > > > > The response code returned from proxy can tell if a failure can be > retried > > safely or not. (We might need to make them well documented) > > > > > > > >> > >> > > >> > > >> > > 4) If A fails due to an error that could be a lost ack problem > >> (network > >> > > connectivity / etc), within a bounded time frame it should be > >> possible to > >> > > find out if the write succeed or failed. Either by reading from some > >> > > checkpoint of the log for the changes that should have been made or > >> some > >> > > other possible server-side support. > >> > > > >> > > >> > If I understand this correctly, it is a duplication issue, right? > >> > > >> > Can a de-duplication solution work here? Either DL or your client does > >> the > >> > de-duplication? > >> > > >> > >> The requirements I'm mentioning are the ones needed for client-side > >> dedupping. Since if I can guarantee writes being exposed within some > time > >> frame, and I can never get into an inconsistently ordered state when > >> successes happen, when an error occurs, I can always wait for max time > >> frame, read the latest writes, and then dedup locally against the > request > >> I > >> just made. > >> > >> The main thing about that timeframe is that its basically the addition > of > >> every timeout, all the way down in the system, combined with whatever > >> flushing / caching / etc times are at the bookkeeper / client level for > >> when values are exposed > > > > > > Gotcha. > > > >> > >> > >> > > >> > Is there any ways to identify your write? > >> > > >> > I can think of a case as follow - I want to know what is your expected > >> > behavior from the log. > >> > > >> > a) > >> > > >> > If a hbase region server A writes a change of key K to the log, the > >> change > >> > is successfully made to the log; > >> > but server A is down before receiving the change. > >> > region server B took over the region that contains K, what will B do? > >> > > >> > >> HBase writes in large chunks (WAL Logs), which its replication system > then > >> handles by replaying in the case of failure. If I'm in a middle of a > log, > >> and the whole region goes down and gets rescheduled elsewhere, I will > >> start > >> back up from the beginning of the log I was in the middle of. Using > >> checkpointing + deduping, we should be able to find out where we left > off > >> in the log. > > > > > >> > > >> > > >> > b) same as a). but server A was just network partitioned. will both A > >> and B > >> > write the change of key K? > >> > > >> > >> HBase gives us some guarantees around network partitions (Consistency > over > >> availability for HBase). HBase is a single-master failover recovery type > >> of > >> system, with zookeeper-based guarantees for single owners (writers) of a > >> range of data. > >> > >> > > >> > > >> > > >> > > 5) If A is turned into multiple batches (one large request gets > split > >> > into > >> > > multiple smaller ones to the bookkeeper backend, due to log rolling > / > >> > size > >> > > / etc): > >> > > a) The ordering of entries within batches have ordering > consistence > >> > with > >> > > the original request, when exposed in the log (though they may be > >> > > interleaved with other requests) > >> > > b) The ordering across batches have ordering consistence with the > >> > > original request, when exposed in the log (though they may be > >> interleaved > >> > > with other requests) > >> > > c) If a batch fails, and cannot be retried / is unsuccessfully > >> retried, > >> > > all batches after the failed batch should not be exposed in the log. > >> > Note: > >> > > The batches before and including the failed batch, that ended up > >> > > succeeding, can show up in the log, again within some bounded time > >> range > >> > > for reads by a client. > >> > > > >> > > >> > There is a method 'writeBulk' in DistributedLogClient can achieve this > >> > guarantee. > >> > > >> > However, I am not very sure about how will you turn A into batches. If > >> you > >> > are dividing A into batches, > >> > you can simply control the application write sequence to achieve the > >> > guarantee here. > >> > > >> > Can you explain more about this? > >> > > >> > >> In this case, by batches I mean what the proxy does with the single > >> request > >> that I send it. If the proxy decides it needs to turn my single request > >> into multiple batches of requests, due to log rolling, size limitations, > >> etc, those would be the guarantees I need to be able to reduplicate on > the > >> client side. > > > > > > A single record written by #write and A record set (set of records) > > written by #writeRecordSet are atomic - they will not be broken down into > > entries (batches). With the correct response code, you would be able to > > tell if it is a lost-ack failure or not. However there is a size > limitation > > for this - it can't not go beyond 1MB for current implementation. > > > > What is your expected record size? > > > > > >> > >> > > >> > > >> > > > >> > > Since we can guarantee per-key ordering on the client side, we > >> guarantee > >> > > that there is a single writer per-key, just not per log. > >> > > >> > > >> > Do you need fencing guarantee in the case of network partition causing > >> > two-writers? > >> > > >> > > >> > > So if there was a > >> > > way to guarantee a single write request as being written or not, > >> within a > >> > > certain time frame (since failures should be rare anyways, this is > >> fine > >> > if > >> > > it is expensive), we can then have the client guarantee the ordering > >> it > >> > > needs. > >> > > > >> > > >> > This sounds an 'exact-once' write (regarding retries) requirement to > me, > >> > right? > >> > > >> Yes. I'm curious of how this issue is handled by Manhattan, since you > can > >> imagine a data store that ends up getting multiple writes for the same > put > >> / get / etc, would be harder to use, and we are basically trying to > create > >> a log like that for HBase. > > > > > > Are you guys replacing HBase WAL? > > > > In Manhattan case, the request will be first written to DL streams by > > Manhattan coordinator. The Manhattan replica then will read from the DL > > streams and apply the change. In the lost-ack case, the MH coordinator > will > > just fail the request to client. > > > > My feeling here is your usage for HBase is a bit different from how we > use > > DL in Manhattan. It sounds like you read from a source (HBase WAL) and > > write to DL. But I might be wrong. > > > > > >> > >> > > >> > > >> > > > >> > > > >> > > > Cameron: > >> > > > Another thing we've discussed but haven't really thought through - > >> > > > We might be able to support some kind of epoch write request, > where > >> the > >> > > > epoch is guaranteed to have changed if the writer has changed or > the > >> > > ledger > >> > > > was ever fenced off. Writes include an epoch and are rejected if > the > >> > > epoch > >> > > > has changed. > >> > > > With a mechanism like this, fencing the ledger off after a failure > >> > would > >> > > > ensure any pending writes had either been written or would be > >> rejected. > >> > > > >> > > The issue would be how I guarantee the write I wrote to the server > was > >> > > written. Since a network issue could happen on the send of the > >> request, > >> > or > >> > > on the receive of the success response, an epoch wouldn't tell me > if I > >> > can > >> > > successfully retry, as it could be successfully written but AWS > >> dropped > >> > the > >> > > connection for the success response. Since the epoch would be the > same > >> > > (same ledger), I could write duplicates. > >> > > > >> > > > >> > > > We are currently proposing adding a transaction semantic to dl to > >> get > >> > rid > >> > > > of the size limitation and the unaware-ness in the proxy client. > >> Here > >> > is > >> > > > our idea - > >> > > > http://mail-archives.apache.org/mod_mbox/incubator-distributedlog > >> > > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm > >> > > <http://mail-archives.apache.org/mod_mbox/incubator- > >> > distributedlog%0A-dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh > 42X=xuYwYm> > >> > > l4nxsybyiofzxpv...@mail.gmail.com%3e > >> > > > >> > > > I am not sure if your idea is similar as ours. but we'd like to > >> > > collaborate > >> > > > with the community if anyone has the similar idea. > >> > > > >> > > Our use case would be covered by transaction support, but I'm unsure > >> if > >> > we > >> > > would need something that heavy weight for the guarantees we need. > >> > > > >> > > >> > > > >> > > Basically, the high level requirement here is "Support consistent > >> write > >> > > ordering for single-writer-per-key, multi-writer-per-log". My hunch > is > >> > > that, with some added guarantees to the proxy (if it isn't already > >> > > supported), and some custom client code on our side for removing the > >> > > entries that actually succeed to write to DistributedLog from the > >> request > >> > > that failed, it should be a relatively easy thing to support. > >> > > > >> > > >> > Yup. I think it should not be very difficult to support. There might > be > >> > some changes in the server side. > >> > Let's figure out what will the changes be. Are you guys interested in > >> > contributing? > >> > > >> > Yes, we would be. > >> > >> As a note, the one thing that we see as an issue with the client side > >> dedupping is how to bound the range of data that needs to be looked at > for > >> deduplication. As you can imagine, it is pretty easy to bound the bottom > >> of > >> the range, as that it just regular checkpointing of the DSLN that is > >> returned. I'm still not sure if there is any nice way to time bound the > >> top > >> end of the range, especially since the proxy owns sequence numbers > (which > >> makes sense). I am curious if there is more that can be done if > >> deduplication is on the server side. However the main minus I see of > >> server > >> side deduplication is that instead of running contingent on there being > a > >> failed client request, instead it would have to run every time a write > >> happens. > > > > > > For a reliable dedup, we probably need fence-then-getLastDLSN operation - > > so it would guarantee that any non-completed requests issued (lost-ack > > requests) before this fence-then-getLastDLSN operation will be failed and > > they will never land at the log. > > > > the pseudo code would look like below - > > > > write(request) onFailure { t => > > > > if (t is timeout exception) { > > > > DLSN lastDLSN = fenceThenGetLastDLSN() > > DLSN lastCheckpointedDLSN = ...; > > // find if the request lands between [lastDLSN, lastCheckpointedDLSN]. > > // if it exists, the write succeed; otherwise retry. > > > > } > > > > > > } > > > > > Just realized the idea is same as what Leigh raised in the previous email > about 'epoch write'. Let me explain more about this idea (Leigh, feel free > to jump in to fill up your idea). > > - when a log stream is owned, the proxy use the last transaction id as the > epoch > - when a client connects (handshake with the proxy), it will get the epoch > for the stream. > - the writes issued by this client will carry the epoch to the proxy. > - add a new rpc - fenceThenGetLastDLSN - it would force the proxy to bump > the epoch. > - if fenceThenGetLastDLSN happened, all the outstanding writes with old > epoch will be rejected with exceptions (e.g. EpochFenced). > - The DLSN returned from fenceThenGetLastDLSN can be used as the bound for > deduplications on failures. > > Cameron, does this sound a solution to your use case? > > > > > > > > >> > >> Maybe something that could fit a similar need that Kafka does (the last > >> store value for a particular key in a log), such that on a per key basis > >> there could be a sequence number that support deduplication? Cost seems > >> like it would be high however, and I'm not even sure if bookkeeper > >> supports > >> it. > > > > > >> Cheers, > >> Cameron > >> > >> > > >> > > > >> > > Thanks, > >> > > Cameron > >> > > > >> > > > >> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart > >> > <lstew...@twitter.com.invalid > >> > > > > >> > > wrote: > >> > > > >> > > > Cameron: > >> > > > Another thing we've discussed but haven't really thought through - > >> > > > We might be able to support some kind of epoch write request, > where > >> the > >> > > > epoch is guaranteed to have changed if the writer has changed or > the > >> > > ledger > >> > > > was ever fenced off. Writes include an epoch and are rejected if > the > >> > > epoch > >> > > > has changed. > >> > > > With a mechanism like this, fencing the ledger off after a failure > >> > would > >> > > > ensure any pending writes had either been written or would be > >> rejected. > >> > > > > >> > > > > >> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <si...@apache.org> > wrote: > >> > > > > >> > > > > Cameron, > >> > > > > > >> > > > > I think both Leigh and Xi had made a few good points about your > >> > > question. > >> > > > > > >> > > > > To add one more point to your question - "but I am not > >> > > > > 100% of how all of the futures in the code handle failures. > >> > > > > If not, where in the code would be the relevant places to add > the > >> > > ability > >> > > > > to do this, and would the project be interested in a pull > >> request?" > >> > > > > > >> > > > > The current proxy and client logic doesn't do perfectly on > >> handling > >> > > > > failures (duplicates) - the strategy now is the client will > retry > >> as > >> > > best > >> > > > > at it can before throwing exceptions to users. The code you are > >> > looking > >> > > > for > >> > > > > - it is on BKLogSegmentWriter for the proxy handling writes and > >> it is > >> > > on > >> > > > > DistributedLogClientImpl for the proxy client handling responses > >> from > >> > > > > proxies. Does this help you? > >> > > > > > >> > > > > And also, you are welcome to contribute the pull requests. > >> > > > > > >> > > > > - Sijie > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield < > >> kin...@gmail.com> > >> > > > wrote: > >> > > > > > >> > > > > > I have a question about the Proxy Client. Basically, for our > use > >> > > cases, > >> > > > > we > >> > > > > > want to guarantee ordering at the key level, irrespective of > the > >> > > > ordering > >> > > > > > of the partition it may be assigned to as a whole. Due to the > >> > source > >> > > of > >> > > > > the > >> > > > > > data (HBase Replication), we cannot guarantee that a single > >> > partition > >> > > > > will > >> > > > > > be owned for writes by the same client. This means the proxy > >> client > >> > > > works > >> > > > > > well (since we don't care which proxy owns the partition we > are > >> > > writing > >> > > > > > to). > >> > > > > > > >> > > > > > > >> > > > > > However, the guarantees we need when writing a batch consists > >> of: > >> > > > > > Definition of a Batch: The set of records sent to the > writeBatch > >> > > > endpoint > >> > > > > > on the proxy > >> > > > > > > >> > > > > > 1. Batch success: If the client receives a success from the > >> proxy, > >> > > then > >> > > > > > that batch is successfully written > >> > > > > > > >> > > > > > 2. Inter-Batch ordering : Once a batch has been written > >> > successfully > >> > > by > >> > > > > the > >> > > > > > client, when another batch is written, it will be guaranteed > to > >> be > >> > > > > ordered > >> > > > > > after the last batch (if it is the same stream). > >> > > > > > > >> > > > > > 3. Intra-Batch ordering: Within a batch of writes, the records > >> will > >> > > be > >> > > > > > committed in order > >> > > > > > > >> > > > > > 4. Intra-Batch failure ordering: If an individual record fails > >> to > >> > > write > >> > > > > > within a batch, all records after that record will not be > >> written. > >> > > > > > > >> > > > > > 5. Batch Commit: Guarantee that if a batch returns a success, > it > >> > will > >> > > > be > >> > > > > > written > >> > > > > > > >> > > > > > 6. Read-after-write: Once a batch is committed, within a > limited > >> > > > > time-frame > >> > > > > > it will be able to be read. This is required in the case of > >> > failure, > >> > > so > >> > > > > > that the client can see what actually got committed. I believe > >> the > >> > > > > > time-frame part could be removed if the client can send in the > >> same > >> > > > > > sequence number that was written previously, since it would > then > >> > fail > >> > > > and > >> > > > > > we would know that a read needs to occur. > >> > > > > > > >> > > > > > > >> > > > > > So, my basic question is if this is currently possible in the > >> > proxy? > >> > > I > >> > > > > > don't believe it gives these guarantees as it stands today, > but > >> I > >> > am > >> > > > not > >> > > > > > 100% of how all of the futures in the code handle failures. > >> > > > > > If not, where in the code would be the relevant places to add > >> the > >> > > > ability > >> > > > > > to do this, and would the project be interested in a pull > >> request? > >> > > > > > > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Cameron > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >