Answer inline:

On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <si...@apache.org> wrote:

> Cameron,
>
> Thank you for your summary. I liked the discussion here. I also liked the
> summary of your requirement - 'single-writer-per-key,
> multiple-writers-per-log'. If I understand correctly, the core concern here
> is almost 'exact-once' write (or a way to explicit tell if a write can be
> retried or not).
>
> Comments inline.
>
> On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <kin...@gmail.com>
> wrote:
>
> > > Ah- yes good point (to be clear we're not using the proxy this way
> > today).
> >
> > > > Due to the source of the
> > > > data (HBase Replication), we cannot guarantee that a single partition
> > will
> > > > be owned for writes by the same client.
> >
> > > Do you mean you *need* to support multiple writers issuing interleaved
> > > writes or is it just that they might sometimes interleave writes and
> you
> > >don't care?
> > How HBase partitions the keys being written wouldn't have a one->one
> > mapping with the partitions we would have in HBase. Even if we did have
> > that alignment when the cluster first started, HBase will rebalance what
> > servers own what partitions, as well as split and merge partitions that
> > already exist, causing eventual drift from one log per partition.
> > Because we want ordering guarantees per key (row in hbase), we partition
> > the logs by the key. Since multiple writers are possible per range of
> keys
> > (due to the aforementioned rebalancing / splitting / etc of hbase), we
> > cannot use the core library due to requiring a single writer for
> ordering.
> >
> > But, for a single log, we don't really care about ordering aside from at
> > the per-key level. So all we really need to be able to handle is
> preventing
> > duplicates when a failure occurs, and ordering consistency across
> requests
> > from a single client.
> >
> > So our general requirements are:
> > Write A, Write B
> > Timeline: A -> B
> > Request B is only made after A has successfully returned (possibly after
> > retries)
> >
> > 1) If the write succeeds, it will be durably exposed to clients within
> some
> > bounded time frame
> >
>
> Guaranteed.
>

> > 2) If A succeeds and B succeeds, the ordering for the log will be A and
> > then B
> >
>
> If I understand correctly here, B is only sent after A is returned, right?
> If that's the case, It is guaranteed.



>
>
>
> > 3) If A fails due to an error that can be relied on to *not* be a lost
> ack
> > problem, it will never be exposed to the client, so it may (depending on
> > the error) be retried immediately
> >
>
> If it is not a lost-ack problem, the entry will be exposed. it is
> guaranteed.

Let me try rephrasing the questions, to make sure I'm understanding
correctly:
If A fails, with an error such as "Unable to create connection to
bookkeeper server", that would be the type of error we would expect to be
able to retry immediately, as that result means no action was taken on any
log / etc, so no entry could have been created. This is different then a
"Connection Timeout" exception, as we just might not have gotten a response
in time.


>
>
> > 4) If A fails due to an error that could be a lost ack problem (network
> > connectivity / etc), within a bounded time frame it should be possible to
> > find out if the write succeed or failed. Either by reading from some
> > checkpoint of the log for the changes that should have been made or some
> > other possible server-side support.
> >
>
> If I understand this correctly, it is a duplication issue, right?
>
> Can a de-duplication solution work here? Either DL or your client does the
> de-duplication?
>

The requirements I'm mentioning are the ones needed for client-side
dedupping. Since if I can guarantee writes being exposed within some time
frame, and I can never get into an inconsistently ordered state when
successes happen, when an error occurs, I can always wait for max time
frame, read the latest writes, and then dedup locally against the request I
just made.

The main thing about that timeframe is that its basically the addition of
every timeout, all the way down in the system, combined with whatever
flushing / caching / etc times are at the bookkeeper / client level for
when values are exposed


>
> Is there any ways to identify your write?
>
> I can think of a case as follow - I want to know what is your expected
> behavior from the log.
>
> a)
>
> If a hbase region server A writes a change of key K to the log, the change
> is successfully made to the log;
> but server A is down before receiving the change.
> region server B took over the region that contains K, what will B do?
>

HBase writes in large chunks (WAL Logs), which its replication system then
handles by replaying in the case of failure. If I'm in a middle of a log,
and the whole region goes down and gets rescheduled elsewhere, I will start
back up from the beginning of the log I was in the middle of. Using
checkpointing + deduping, we should be able to find out where we left off
in the log.

>
>
> b) same as a). but server A was just network partitioned. will both A and B
> write the change of key K?
>

HBase gives us some guarantees around network partitions (Consistency over
availability for HBase). HBase is a single-master failover recovery type of
system, with zookeeper-based guarantees for single owners (writers) of a
range of data.

>
>
>
> > 5) If A is turned into multiple batches (one large request gets split
> into
> > multiple smaller ones to the bookkeeper backend, due to log rolling /
> size
> > / etc):
> >   a) The ordering of entries within batches have ordering consistence
> with
> > the original request, when exposed in the log (though they may be
> > interleaved with other requests)
> >   b) The ordering across batches have ordering consistence with the
> > original request, when exposed in the log (though they may be interleaved
> > with other requests)
> >   c) If a batch fails, and cannot be retried / is unsuccessfully retried,
> > all batches after the failed batch should not be exposed in the log.
> Note:
> > The batches before and including the failed batch, that ended up
> > succeeding, can show up in the log, again within some bounded time range
> > for reads by a client.
> >
>
> There is a method 'writeBulk' in DistributedLogClient can achieve this
> guarantee.
>
> However, I am not very sure about how will you turn A into batches. If you
> are dividing A into batches,
> you can simply control the application write sequence to achieve the
> guarantee here.
>
> Can you explain more about this?
>

In this case, by batches I mean what the proxy does with the single request
that I send it. If the proxy decides it needs to turn my single request
into multiple batches of requests, due to log rolling, size limitations,
etc, those would be the guarantees I need to be able to reduplicate on the
client side.

>
>
> >
> > Since we can guarantee per-key ordering on the client side, we guarantee
> > that there is a single writer per-key, just not per log.
>
>
> Do you need fencing guarantee in the case of network partition causing
> two-writers?
>
>
> > So if there was a
> > way to guarantee a single write request as being written or not, within a
> > certain time frame (since failures should be rare anyways, this is fine
> if
> > it is expensive), we can then have the client guarantee the ordering it
> > needs.
> >
>
> This sounds an 'exact-once' write (regarding retries) requirement to me,
> right?
>
Yes. I'm curious of how this issue is handled by Manhattan, since you can
imagine a data store that ends up getting multiple writes for the same put
/ get / etc, would be harder to use, and we are basically trying to create
a log like that for HBase.

>
>
> >
> >
> > > Cameron:
> > > Another thing we've discussed but haven't really thought through -
> > > We might be able to support some kind of epoch write request, where the
> > > epoch is guaranteed to have changed if the writer has changed or the
> > ledger
> > > was ever fenced off. Writes include an epoch and are rejected if the
> > epoch
> > > has changed.
> > > With a mechanism like this, fencing the ledger off after a failure
> would
> > > ensure any pending writes had either been written or would be rejected.
> >
> > The issue would be how I guarantee the write I wrote to the server was
> > written. Since a network issue could happen on the send of the request,
> or
> > on the receive of the success response, an epoch wouldn't tell me if I
> can
> > successfully retry, as it could be successfully written but AWS dropped
> the
> > connection for the success response. Since the epoch would be the same
> > (same ledger), I could write duplicates.
> >
> >
> > > We are currently proposing adding a transaction semantic to dl to get
> rid
> > > of the size limitation and the unaware-ness in the proxy client. Here
> is
> > > our idea -
> > > http://mail-archives.apache.org/mod_mbox/incubator-distributedlog
> > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm
> > <http://mail-archives.apache.org/mod_mbox/incubator-
> distributedlog%0A-dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm>
> > l4nxsybyiofzxpv...@mail.gmail.com%3e
> >
> > > I am not sure if your idea is similar as ours. but we'd like to
> > collaborate
> > > with the community if anyone has the similar idea.
> >
> > Our use case would be covered by transaction support, but I'm unsure if
> we
> > would need something that heavy weight for the guarantees we need.
> >
>
> >
> > Basically, the high level requirement here is "Support consistent write
> > ordering for single-writer-per-key, multi-writer-per-log". My hunch is
> > that, with some added guarantees to the proxy (if it isn't already
> > supported), and some custom client code on our side for removing the
> > entries that actually succeed to write to DistributedLog from the request
> > that failed, it should be a relatively easy thing to support.
> >
>
> Yup. I think it should not be very difficult to support. There might be
> some changes in the server side.
> Let's figure out what will the changes be. Are you guys interested in
> contributing?
>
> Yes, we would be.

As a note, the one thing that we see as an issue with the client side
dedupping is how to bound the range of data that needs to be looked at for
deduplication. As you can imagine, it is pretty easy to bound the bottom of
the range, as that it just regular checkpointing of the DSLN that is
returned. I'm still not sure if there is any nice way to time bound the top
end of the range, especially since the proxy owns sequence numbers (which
makes sense). I am curious if there is more that can be done if
deduplication is on the server side. However the main minus I see of server
side deduplication is that instead of running contingent on there being a
failed client request, instead it would have to run every time a write
happens.

Maybe something that could fit a similar need that Kafka does (the last
store value for a particular key in a log), such that on a per key basis
there could be a sequence number that support deduplication? Cost seems
like it would be high however, and I'm not even sure if bookkeeper supports
it.

Cheers,
Cameron

>
> >
> > Thanks,
> > Cameron
> >
> >
> > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
> <lstew...@twitter.com.invalid
> > >
> > wrote:
> >
> > > Cameron:
> > > Another thing we've discussed but haven't really thought through -
> > > We might be able to support some kind of epoch write request, where the
> > > epoch is guaranteed to have changed if the writer has changed or the
> > ledger
> > > was ever fenced off. Writes include an epoch and are rejected if the
> > epoch
> > > has changed.
> > > With a mechanism like this, fencing the ledger off after a failure
> would
> > > ensure any pending writes had either been written or would be rejected.
> > >
> > >
> > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <si...@apache.org> wrote:
> > >
> > > > Cameron,
> > > >
> > > > I think both Leigh and Xi had made a few good points about your
> > question.
> > > >
> > > > To add one more point to your question - "but I am not
> > > > 100% of how all of the futures in the code handle failures.
> > > > If not, where in the code would be the relevant places to add the
> > ability
> > > > to do this, and would the project be interested in a pull request?"
> > > >
> > > > The current proxy and client logic doesn't do perfectly on handling
> > > > failures (duplicates) - the strategy now is the client will retry as
> > best
> > > > at it can before throwing exceptions to users. The code you are
> looking
> > > for
> > > > - it is on BKLogSegmentWriter for the proxy handling writes and it is
> > on
> > > > DistributedLogClientImpl for the proxy client handling responses from
> > > > proxies. Does this help you?
> > > >
> > > > And also, you are welcome to contribute the pull requests.
> > > >
> > > > - Sijie
> > > >
> > > >
> > > >
> > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield <kin...@gmail.com>
> > > wrote:
> > > >
> > > > > I have a question about the Proxy Client. Basically, for our use
> > cases,
> > > > we
> > > > > want to guarantee ordering at the key level, irrespective of the
> > > ordering
> > > > > of the partition it may be assigned to as a whole. Due to the
> source
> > of
> > > > the
> > > > > data (HBase Replication), we cannot guarantee that a single
> partition
> > > > will
> > > > > be owned for writes by the same client. This means the proxy client
> > > works
> > > > > well (since we don't care which proxy owns the partition we are
> > writing
> > > > > to).
> > > > >
> > > > >
> > > > > However, the guarantees we need when writing a batch consists of:
> > > > > Definition of a Batch: The set of records sent to the writeBatch
> > > endpoint
> > > > > on the proxy
> > > > >
> > > > > 1. Batch success: If the client receives a success from the proxy,
> > then
> > > > > that batch is successfully written
> > > > >
> > > > > 2. Inter-Batch ordering : Once a batch has been written
> successfully
> > by
> > > > the
> > > > > client, when another batch is written, it will be guaranteed to be
> > > > ordered
> > > > > after the last batch (if it is the same stream).
> > > > >
> > > > > 3. Intra-Batch ordering: Within a batch of writes, the records will
> > be
> > > > > committed in order
> > > > >
> > > > > 4. Intra-Batch failure ordering: If an individual record fails to
> > write
> > > > > within a batch, all records after that record will not be written.
> > > > >
> > > > > 5. Batch Commit: Guarantee that if a batch returns a success, it
> will
> > > be
> > > > > written
> > > > >
> > > > > 6. Read-after-write: Once a batch is committed, within a limited
> > > > time-frame
> > > > > it will be able to be read. This is required in the case of
> failure,
> > so
> > > > > that the client can see what actually got committed. I believe the
> > > > > time-frame part could be removed if the client can send in the same
> > > > > sequence number that was written previously, since it would then
> fail
> > > and
> > > > > we would know that a read needs to occur.
> > > > >
> > > > >
> > > > > So, my basic question is if this is currently possible in the
> proxy?
> > I
> > > > > don't believe it gives these guarantees as it stands today, but I
> am
> > > not
> > > > > 100% of how all of the futures in the code handle failures.
> > > > > If not, where in the code would be the relevant places to add the
> > > ability
> > > > > to do this, and would the project be interested in a pull request?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Cameron
> > > > >
> > > >
> > >
> >
>

Reply via email to