On Mon, Oct 17, 2016 at 3:14 PM, Cameron Hatfield <kin...@gmail.com> wrote:

> Answer inline:
>
> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <si...@apache.org> wrote:
>
> > Cameron,
> >
> > Thank you for your summary. I liked the discussion here. I also liked the
> > summary of your requirement - 'single-writer-per-key,
> > multiple-writers-per-log'. If I understand correctly, the core concern
> here
> > is almost 'exact-once' write (or a way to explicit tell if a write can be
> > retried or not).
> >
> > Comments inline.
> >
> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <kin...@gmail.com>
> > wrote:
> >
> > > > Ah- yes good point (to be clear we're not using the proxy this way
> > > today).
> > >
> > > > > Due to the source of the
> > > > > data (HBase Replication), we cannot guarantee that a single
> partition
> > > will
> > > > > be owned for writes by the same client.
> > >
> > > > Do you mean you *need* to support multiple writers issuing
> interleaved
> > > > writes or is it just that they might sometimes interleave writes and
> > you
> > > >don't care?
> > > How HBase partitions the keys being written wouldn't have a one->one
> > > mapping with the partitions we would have in HBase. Even if we did have
> > > that alignment when the cluster first started, HBase will rebalance
> what
> > > servers own what partitions, as well as split and merge partitions that
> > > already exist, causing eventual drift from one log per partition.
> > > Because we want ordering guarantees per key (row in hbase), we
> partition
> > > the logs by the key. Since multiple writers are possible per range of
> > keys
> > > (due to the aforementioned rebalancing / splitting / etc of hbase), we
> > > cannot use the core library due to requiring a single writer for
> > ordering.
> > >
> > > But, for a single log, we don't really care about ordering aside from
> at
> > > the per-key level. So all we really need to be able to handle is
> > preventing
> > > duplicates when a failure occurs, and ordering consistency across
> > requests
> > > from a single client.
> > >
> > > So our general requirements are:
> > > Write A, Write B
> > > Timeline: A -> B
> > > Request B is only made after A has successfully returned (possibly
> after
> > > retries)
> > >
> > > 1) If the write succeeds, it will be durably exposed to clients within
> > some
> > > bounded time frame
> > >
> >
> > Guaranteed.
> >
>
> > > 2) If A succeeds and B succeeds, the ordering for the log will be A and
> > > then B
> > >
> >
> > If I understand correctly here, B is only sent after A is returned,
> right?
> > If that's the case, It is guaranteed.
>
>
>
> >
> >
> >
> > > 3) If A fails due to an error that can be relied on to *not* be a lost
> > ack
> > > problem, it will never be exposed to the client, so it may (depending
> on
> > > the error) be retried immediately
> > >
> >
> > If it is not a lost-ack problem, the entry will be exposed. it is
> > guaranteed.
>
> Let me try rephrasing the questions, to make sure I'm understanding
> correctly:
> If A fails, with an error such as "Unable to create connection to
> bookkeeper server", that would be the type of error we would expect to be
> able to retry immediately, as that result means no action was taken on any
> log / etc, so no entry could have been created. This is different then a
> "Connection Timeout" exception, as we just might not have gotten a response
> in time.
>
>
> >
> >
> > > 4) If A fails due to an error that could be a lost ack problem (network
> > > connectivity / etc), within a bounded time frame it should be possible
> to
> > > find out if the write succeed or failed. Either by reading from some
> > > checkpoint of the log for the changes that should have been made or
> some
> > > other possible server-side support.
> > >
> >
> > If I understand this correctly, it is a duplication issue, right?
> >
> > Can a de-duplication solution work here? Either DL or your client does
> the
> > de-duplication?
> >
>
> The requirements I'm mentioning are the ones needed for client-side
> dedupping. Since if I can guarantee writes being exposed within some time
> frame, and I can never get into an inconsistently ordered state when
> successes happen, when an error occurs, I can always wait for max time
> frame, read the latest writes, and then dedup locally against the request I
> just made.
>
> The main thing about that timeframe is that its basically the addition of
> every timeout, all the way down in the system, combined with whatever
> flushing / caching / etc times are at the bookkeeper / client level for
> when values are exposed
>
>
> >
> > Is there any ways to identify your write?
> >
> > I can think of a case as follow - I want to know what is your expected
> > behavior from the log.
> >
> > a)
> >
> > If a hbase region server A writes a change of key K to the log, the
> change
> > is successfully made to the log;
> > but server A is down before receiving the change.
> > region server B took over the region that contains K, what will B do?
> >
>
> HBase writes in large chunks (WAL Logs), which its replication system then
> handles by replaying in the case of failure. If I'm in a middle of a log,
> and the whole region goes down and gets rescheduled elsewhere, I will start
> back up from the beginning of the log I was in the middle of. Using
> checkpointing + deduping, we should be able to find out where we left off
> in the log.
>

I found this discussion here is pretty interesting. Out of curiosity, are
you guys planning on replacing the HBase WAL with DL or just streaming the
WAL changes to DL streams?


>
> >
> >
> > b) same as a). but server A was just network partitioned. will both A
> and B
> > write the change of key K?
> >
>
> HBase gives us some guarantees around network partitions (Consistency over
> availability for HBase). HBase is a single-master failover recovery type of
> system, with zookeeper-based guarantees for single owners (writers) of a
> range of data.
>
> >
> >
> >
> > > 5) If A is turned into multiple batches (one large request gets split
> > into
> > > multiple smaller ones to the bookkeeper backend, due to log rolling /
> > size
> > > / etc):
> > >   a) The ordering of entries within batches have ordering consistence
> > with
> > > the original request, when exposed in the log (though they may be
> > > interleaved with other requests)
> > >   b) The ordering across batches have ordering consistence with the
> > > original request, when exposed in the log (though they may be
> interleaved
> > > with other requests)
> > >   c) If a batch fails, and cannot be retried / is unsuccessfully
> retried,
> > > all batches after the failed batch should not be exposed in the log.
> > Note:
> > > The batches before and including the failed batch, that ended up
> > > succeeding, can show up in the log, again within some bounded time
> range
> > > for reads by a client.
> > >
> >
> > There is a method 'writeBulk' in DistributedLogClient can achieve this
> > guarantee.
> >
> > However, I am not very sure about how will you turn A into batches. If
> you
> > are dividing A into batches,
> > you can simply control the application write sequence to achieve the
> > guarantee here.
> >
> > Can you explain more about this?
> >
>
> In this case, by batches I mean what the proxy does with the single request
> that I send it. If the proxy decides it needs to turn my single request
> into multiple batches of requests, due to log rolling, size limitations,
> etc, those would be the guarantees I need to be able to reduplicate on the
> client side.
>
> >
> >
> > >
> > > Since we can guarantee per-key ordering on the client side, we
> guarantee
> > > that there is a single writer per-key, just not per log.
> >
> >
> > Do you need fencing guarantee in the case of network partition causing
> > two-writers?
> >
> >
> > > So if there was a
> > > way to guarantee a single write request as being written or not,
> within a
> > > certain time frame (since failures should be rare anyways, this is fine
> > if
> > > it is expensive), we can then have the client guarantee the ordering it
> > > needs.
> > >
> >
> > This sounds an 'exact-once' write (regarding retries) requirement to me,
> > right?
> >
> Yes. I'm curious of how this issue is handled by Manhattan, since you can
> imagine a data store that ends up getting multiple writes for the same put
> / get / etc, would be harder to use, and we are basically trying to create
> a log like that for HBase.
>
> >
> >
> > >
> > >
> > > > Cameron:
> > > > Another thing we've discussed but haven't really thought through -
> > > > We might be able to support some kind of epoch write request, where
> the
> > > > epoch is guaranteed to have changed if the writer has changed or the
> > > ledger
> > > > was ever fenced off. Writes include an epoch and are rejected if the
> > > epoch
> > > > has changed.
> > > > With a mechanism like this, fencing the ledger off after a failure
> > would
> > > > ensure any pending writes had either been written or would be
> rejected.
> > >
> > > The issue would be how I guarantee the write I wrote to the server was
> > > written. Since a network issue could happen on the send of the request,
> > or
> > > on the receive of the success response, an epoch wouldn't tell me if I
> > can
> > > successfully retry, as it could be successfully written but AWS dropped
> > the
> > > connection for the success response. Since the epoch would be the same
> > > (same ledger), I could write duplicates.
> > >
> > >
> > > > We are currently proposing adding a transaction semantic to dl to get
> > rid
> > > > of the size limitation and the unaware-ness in the proxy client. Here
> > is
> > > > our idea -
> > > > http://mail-archives.apache.org/mod_mbox/incubator-distributedlog
> > > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm
> > > <http://mail-archives.apache.org/mod_mbox/incubator-
> > distributedlog%0A-dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm>
> > > l4nxsybyiofzxpv...@mail.gmail.com%3e
> > >
> > > > I am not sure if your idea is similar as ours. but we'd like to
> > > collaborate
> > > > with the community if anyone has the similar idea.
> > >
> > > Our use case would be covered by transaction support, but I'm unsure if
> > we
> > > would need something that heavy weight for the guarantees we need.
> > >
> >
> > >
> > > Basically, the high level requirement here is "Support consistent write
> > > ordering for single-writer-per-key, multi-writer-per-log". My hunch is
> > > that, with some added guarantees to the proxy (if it isn't already
> > > supported), and some custom client code on our side for removing the
> > > entries that actually succeed to write to DistributedLog from the
> request
> > > that failed, it should be a relatively easy thing to support.
> > >
> >
> > Yup. I think it should not be very difficult to support. There might be
> > some changes in the server side.
> > Let's figure out what will the changes be. Are you guys interested in
> > contributing?
> >
> > Yes, we would be.
>
> As a note, the one thing that we see as an issue with the client side
> dedupping is how to bound the range of data that needs to be looked at for
> deduplication.

As you can imagine, it is pretty easy to bound the bottom of
> the range, as that it just regular checkpointing of the DSLN that is
> returned. I'm still not sure if there is any nice way to time bound the top
> end of the range, especially since the proxy owns sequence numbers (which
> makes sense).


I think you can use #getLastDLSN to fetch the top end of the range. But you
will face a situation that a lost-ack write request might land at the log
after you #getLastDLSN.

For example,

a)

- Client writes a request
- the request lands at the log
- Client gets timeout (a lost-ack situation)
- Client issues #getLastDLSN

in this situation, you can dedup correctly as the DLSN returned from
#getLastDLSN is the right top end of the range.

However it becomes wrong at b) as the request lands after client issues
#getLastDLSN.

- Client writes a request
- Client gets timeout (a lost-ack situation)
- Client issues #getLastDLSN
- the request lands at the log

So my feeling is that relying on a bounded time for reduplication can not
guarantee the correctness and is a bit complicated.

Instead, I have a different thought on this (more like a server-side
dedup). My idea basically is if the client can generate its owner sequence
number (client-side SN) during its lifetime, the client-side SN then can be
used for deduplication on server side.

1) A client (for example, a hbase region) connects to write proxy with a
client name.
2) on connected, the client will receive a lease id. (the lease id would be
the last sequence id of the log)
3) the client will start generating sequence numbers from 0.
4) for each request written by the client, it would contain a request id
comprised of lease-id and client sequence number.
5) on the write proxy, the request id will be used for deduplication.

on the client failure (the hbase region is moved or splitted?), the client
will connect to write proxy with same client name and receive a new and
higher lease id. the writes from same client id but with lower lease id
will be rejected by write proxy.

The client id can be the region id in hbase (??). I am not the expert of
hbase, the idea might not be good for your solution. Let me know your
opinion and I'd happy to learn and discuss more.






> I am curious if there is more that can be done if
> deduplication is on the server side. However the main minus I see of server
> side deduplication is that instead of running contingent on there being a
> failed client request, instead it would have to run every time a write
> happens.


> Maybe something that could fit a similar need that Kafka does (the last
> store value for a particular key in a log), such that on a per key basis
> there could be a sequence number that support deduplication?


I am not sure a per-key basis sequence number is doable. My impression
about hbase - it will have lots of keys. Region id might be doable.


> Cost seems
> like it would be high however, and I'm not even sure if bookkeeper supports
> it.
>




>
> Cheers,
> Cameron
>
> >
> > >
> > > Thanks,
> > > Cameron
> > >
> > >
> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
> > <lstew...@twitter.com.invalid
> > > >
> > > wrote:
> > >
> > > > Cameron:
> > > > Another thing we've discussed but haven't really thought through -
> > > > We might be able to support some kind of epoch write request, where
> the
> > > > epoch is guaranteed to have changed if the writer has changed or the
> > > ledger
> > > > was ever fenced off. Writes include an epoch and are rejected if the
> > > epoch
> > > > has changed.
> > > > With a mechanism like this, fencing the ledger off after a failure
> > would
> > > > ensure any pending writes had either been written or would be
> rejected.
> > > >
> > > >
> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <si...@apache.org> wrote:
> > > >
> > > > > Cameron,
> > > > >
> > > > > I think both Leigh and Xi had made a few good points about your
> > > question.
> > > > >
> > > > > To add one more point to your question - "but I am not
> > > > > 100% of how all of the futures in the code handle failures.
> > > > > If not, where in the code would be the relevant places to add the
> > > ability
> > > > > to do this, and would the project be interested in a pull request?"
> > > > >
> > > > > The current proxy and client logic doesn't do perfectly on handling
> > > > > failures (duplicates) - the strategy now is the client will retry
> as
> > > best
> > > > > at it can before throwing exceptions to users. The code you are
> > looking
> > > > for
> > > > > - it is on BKLogSegmentWriter for the proxy handling writes and it
> is
> > > on
> > > > > DistributedLogClientImpl for the proxy client handling responses
> from
> > > > > proxies. Does this help you?
> > > > >
> > > > > And also, you are welcome to contribute the pull requests.
> > > > >
> > > > > - Sijie
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield <kin...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > I have a question about the Proxy Client. Basically, for our use
> > > cases,
> > > > > we
> > > > > > want to guarantee ordering at the key level, irrespective of the
> > > > ordering
> > > > > > of the partition it may be assigned to as a whole. Due to the
> > source
> > > of
> > > > > the
> > > > > > data (HBase Replication), we cannot guarantee that a single
> > partition
> > > > > will
> > > > > > be owned for writes by the same client. This means the proxy
> client
> > > > works
> > > > > > well (since we don't care which proxy owns the partition we are
> > > writing
> > > > > > to).
> > > > > >
> > > > > >
> > > > > > However, the guarantees we need when writing a batch consists of:
> > > > > > Definition of a Batch: The set of records sent to the writeBatch
> > > > endpoint
> > > > > > on the proxy
> > > > > >
> > > > > > 1. Batch success: If the client receives a success from the
> proxy,
> > > then
> > > > > > that batch is successfully written
> > > > > >
> > > > > > 2. Inter-Batch ordering : Once a batch has been written
> > successfully
> > > by
> > > > > the
> > > > > > client, when another batch is written, it will be guaranteed to
> be
> > > > > ordered
> > > > > > after the last batch (if it is the same stream).
> > > > > >
> > > > > > 3. Intra-Batch ordering: Within a batch of writes, the records
> will
> > > be
> > > > > > committed in order
> > > > > >
> > > > > > 4. Intra-Batch failure ordering: If an individual record fails to
> > > write
> > > > > > within a batch, all records after that record will not be
> written.
> > > > > >
> > > > > > 5. Batch Commit: Guarantee that if a batch returns a success, it
> > will
> > > > be
> > > > > > written
> > > > > >
> > > > > > 6. Read-after-write: Once a batch is committed, within a limited
> > > > > time-frame
> > > > > > it will be able to be read. This is required in the case of
> > failure,
> > > so
> > > > > > that the client can see what actually got committed. I believe
> the
> > > > > > time-frame part could be removed if the client can send in the
> same
> > > > > > sequence number that was written previously, since it would then
> > fail
> > > > and
> > > > > > we would know that a read needs to occur.
> > > > > >
> > > > > >
> > > > > > So, my basic question is if this is currently possible in the
> > proxy?
> > > I
> > > > > > don't believe it gives these guarantees as it stands today, but I
> > am
> > > > not
> > > > > > 100% of how all of the futures in the code handle failures.
> > > > > > If not, where in the code would be the relevant places to add the
> > > > ability
> > > > > > to do this, and would the project be interested in a pull
> request?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Cameron
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to