[
https://issues.apache.org/jira/browse/HBASE-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Benoit Sigoure updated HBASE-2898:
----------------------------------
Description:
The {{MultiPut}} RPC needs to be completely rewritten. Let's see why step by
step.
# An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
# Eventually, {{HTable#flushCommits}} is invoked to actually send the edits to
the RegionServer(s).
# This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits
are sorted into one or more {{MultiPut}}. Each {{MultiPut}} is aggregating all
the edits that are going to a particular RegionServer.
# A thread pool is used to send all the {{MultiPut}} in parallel to their
respective RegionServer. Let's follow what happens for a single {{MultiPut}}.
# The {{MultiPut}} travels through the IPC code on the client and then through
the network and then through the IPC code on the RegionServer.
# We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is
created.
# Still in {{HRegionServer#multiPut}}. Since a {{MultiPut}} is essentially a
map from region name to a list of {{Put}} for that region, there's a {{for}}
loop that executes each list of {{Put}} for each region sequentially. Let's
follow what happens for a single list of {{Put}} for a particular region.
# We're now in {{HRegionServer#put(byte[], List<Put>)}}. Each {{Put}} is
associated with the row lock that was specified by the client (if any). Then
the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
# Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes
us to {{HRegion#doMiniBatchPut}}.
# At this point, let's assume that we're doing just 2 edits. So the
{{BatchOperationInProgress}} that {{doMiniBatchPut}} contains just 2 {{Put}}.
# The {{while}} loop in {{doMiniBatchPut}} that's going to execute each {{Put}}
starts.
# The first {{Put}} fails because an exception is thrown when appending the
edit to the {{WAL}}. Its {{batchOp.retCodes}} is marked as
{{OperationStatusCode.FAILURE}}.
# Because there was an exception, we're back to {{HRegion#put(Pair<Put,
Integer>[])}} where the {{while}} loop will test that {{batchOp.isDone}} is
{{false}} and do another iteration.
# {{doMiniBatchPut}} is called again and handles the remaining {{Put}}.
# The second {{Put}} succeeds normally, so its {{batchOp.retCodes}} is marked
as {{OperationStatusCode.SUCCESS}}.
# {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put,
Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
# At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop
and extracts the index of the *first* {{Put}} that failed out of the
{{OperationStatusCode[]}}. In our case, it'll return 0 since the first {{Put}}
failed.
# This index in the list of {{Put}} of the first that failed (0 in this case)
is returned to {{HRegionServer#multiPut}}, which records in the
{{MultiPutResponse}} - the client knows that the first {{Put}} failed but has
no idea about the other one.
So the client has no reliable way of knowing which {{Put}} failed (if any) past
the first failure. All it knows is that for a particular region, they
succeeded up to a particular {{Put}}, at which point there was a failure, and
then the remaining may or may not have succeeded. Its best bet is to retry all
the {{Put}} past the index of the first failure for this region. But this has
an unintended consequence. The {{Put}} that were successful during the first
run will be *re-applied*. This will unexpectedly create extra versions. Now I
realize most people don't really care about versions, so they won't notice.
But whoever relies on the versions for whatever reason will rightfully consider
this to be data corruption.
As it is now, {{MultiPut}} makes proper error handling impossible. Since this
RPC cannot guarantee any atomicity other than at the individual {{Put}} level,
it should return to the client specific information about which {{Put}} failed
in case of a failure, so that the client can do proper error handling.
This requires us to change the {{MultiPutResponse}} so that it can indicate
which {{Put}} specifically failed. We could do this for instance by giving the
index of the {{Put}} along with its {{OperationStatusCode}}. So in the
scenario above, the {{MultiPutResponse}} would essentially return something
like: "for that particular region, put #0 failed, put #1 succeeded". If we
want to save a bit of space, we may want to omit the successes from the
response and only mention the failures - so a response that doesn't mention any
failure means that everything was successful. Not sure whether that's a good
idea though.
Since doing this require an incompatible RPC change, I propose that we take the
opportunity to rewrite the {{MultiPut}} RPC too. Right now it's very
inefficient, it's just a hack on top of {{Put}}. When {{MultiPut}} is written
to the wire, a lot of unnecessary duplicate data is sent out. The timestamp,
the row key and the family are sent out to the wire N+1 times, where N is the
number of edits for a particular row, instead of just once (!).
Alternatively, if we don't want to change the RPCs, we can fix this issue in a
backward compatible way by making the {{while}} loop in {{HRegion#put(Pair<Put,
Integer>[])}} stop as soon as a failure is encountered.
was:
The {{MultiPut}} RPC needs to be completely rewritten. Let's see why step by
step.
# An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
# Eventually, {{HTable#flushCommits}} is invoked to actually send the edits to
the RegionServer(s).
# This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits
are sorted into one or more {{MultiPut}}. Each {{MultiPut}} is aggregating all
the edits that are going to a particular RegionServer.
# A thread pool is used to send all the {{MultiPut}} in parallel to their
respective RegionServer. Let's follow what happens for a single {{MultiPut}}.
# The {{MultiPut}} travels through the IPC code on the client and then through
the network and then through the IPC code on the RegionServer.
# We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is
created.
# Still in {{HRegionServer#multiPut}}. Since a {{MultiPut}} is essentially a
map from region name to a list of {{Put}} for that region, there's a {{for}}
loop that executes each list of {{Put}} for each region sequentially. Let's
follow what happens for a single list of {{Put}} for a particular region.
# We're now in {{HRegionServer#put(byte[], List<Put>)}}. Each {{Put}} is
associated with the row lock that was specified by the client (if any). Then
the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
# Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes
us to {{HRegion#doMiniBatchPut}}.
# At this point, let's assume that we're doing just 2 edits. So the
{{BatchOperationInProgress}} that {{doMiniBatchPut}} has contains just 2
{{Put}}.
# The {{while}} loop in {{doMiniBatchPut}} that's going to execute each {{Put}}
starts.
# The first {{Put}} succeeds normally, so at the end of the first iteration,
its {{batchOp.retCodes}} is marked as {{OperationStatusCode.SUCCESS}}.
# The second {{Put}} fails because an exception is thrown when appending the
edit to the {{WAL}}. Its {{batchOp.retCodes}} is marked as
{{OperationStatusCode.FAILURE}}.
# {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put,
Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
# At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop
and extracts the *first* failure out of the {{OperationStatusCode[]}}. In our
case, it'll take the {{OperationStatusCode.FAILURE}} that came from the second
{{Put}}.
# This error code is returned to {{HRegionServer#multiPut}}, which records in
the {{MultiPutResponse}} that there was a failure - but *we don't know for
which {{Put}}*
So the client has no way of knowing which {{Put}} failed. All it knows is that
at least one of them failed for a particular region. Its best bet is to retry
all the {{Put}} for this region. But this has an unintended consequence. The
{{Put}} that were successful during the first run will be *re-applied*. This
will unexpectedly create extra versions. Now I realize most people don't
really care about versions, so they won't notice. But whoever relies on the
versions for whatever reason will rightfully consider this to be data
corruption.
{{MultiPut}} makes proper error handling impossible. Since this RPC cannot
guarantee any atomicity other than at the individual {{Put}} level, it *must*
return to the client specific information about which {{Put}} failed in case of
a failure, so that the client can do proper error handling.
This requires us to change the {{MultiPutResponse}} so that it can indicate
which {{Put}} specifically failed. We could do this for instance by giving the
index of the {{Put}} along with its {{OperationStatusCode}}. So in the
scenario above, the {{MultiPutResponse}} would essentially return something
like: "for that particular region, put #0 succeeded, put #1 failed". If we
want to save a bit of space, we may want to omit the successes from the
response and only mention the failures - so a response that doesn't mention any
failure means that everything was successful. Not sure whether that's a good
idea though.
Since doing this require an incompatible RPC change, I propose that we take the
opportunity to rewrite the {{MultiPut}} RPC too. Right now it's very
inefficient, it's just a hack on top of {{Put}}. When {{MultiPut}} is written
to the wire, a lot of unnecessary duplicate data is sent out. The timestamp,
the row key and the family are sent out to the wire N+1 times, where N is the
number of edits for a particular row, instead of just once (!).
I adjusted the description of the issue as it was inaccurate at some point, as
Ryan pointed out. The issue remains valid though.
> MultiPut makes proper error handling impossible and leads to corrupted data
> ---------------------------------------------------------------------------
>
> Key: HBASE-2898
> URL: https://issues.apache.org/jira/browse/HBASE-2898
> Project: HBase
> Issue Type: Bug
> Components: client, regionserver
> Affects Versions: 0.89.20100621
> Reporter: Benoit Sigoure
> Priority: Blocker
> Fix For: 0.90.0
>
>
> The {{MultiPut}} RPC needs to be completely rewritten. Let's see why step by
> step.
> # An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
> # Eventually, {{HTable#flushCommits}} is invoked to actually send the edits
> to the RegionServer(s).
> # This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits
> are sorted into one or more {{MultiPut}}. Each {{MultiPut}} is aggregating
> all the edits that are going to a particular RegionServer.
> # A thread pool is used to send all the {{MultiPut}} in parallel to their
> respective RegionServer. Let's follow what happens for a single {{MultiPut}}.
> # The {{MultiPut}} travels through the IPC code on the client and then
> through the network and then through the IPC code on the RegionServer.
> # We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is
> created.
> # Still in {{HRegionServer#multiPut}}. Since a {{MultiPut}} is essentially a
> map from region name to a list of {{Put}} for that region, there's a {{for}}
> loop that executes each list of {{Put}} for each region sequentially. Let's
> follow what happens for a single list of {{Put}} for a particular region.
> # We're now in {{HRegionServer#put(byte[], List<Put>)}}. Each {{Put}} is
> associated with the row lock that was specified by the client (if any). Then
> the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
> # Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes
> us to {{HRegion#doMiniBatchPut}}.
> # At this point, let's assume that we're doing just 2 edits. So the
> {{BatchOperationInProgress}} that {{doMiniBatchPut}} contains just 2 {{Put}}.
> # The {{while}} loop in {{doMiniBatchPut}} that's going to execute each
> {{Put}} starts.
> # The first {{Put}} fails because an exception is thrown when appending the
> edit to the {{WAL}}. Its {{batchOp.retCodes}} is marked as
> {{OperationStatusCode.FAILURE}}.
> # Because there was an exception, we're back to {{HRegion#put(Pair<Put,
> Integer>[])}} where the {{while}} loop will test that {{batchOp.isDone}} is
> {{false}} and do another iteration.
> # {{doMiniBatchPut}} is called again and handles the remaining {{Put}}.
> # The second {{Put}} succeeds normally, so its {{batchOp.retCodes}} is marked
> as {{OperationStatusCode.SUCCESS}}.
> # {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put,
> Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
> # At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop
> and extracts the index of the *first* {{Put}} that failed out of the
> {{OperationStatusCode[]}}. In our case, it'll return 0 since the first
> {{Put}} failed.
> # This index in the list of {{Put}} of the first that failed (0 in this case)
> is returned to {{HRegionServer#multiPut}}, which records in the
> {{MultiPutResponse}} - the client knows that the first {{Put}} failed but has
> no idea about the other one.
> So the client has no reliable way of knowing which {{Put}} failed (if any)
> past the first failure. All it knows is that for a particular region, they
> succeeded up to a particular {{Put}}, at which point there was a failure, and
> then the remaining may or may not have succeeded. Its best bet is to retry
> all the {{Put}} past the index of the first failure for this region. But
> this has an unintended consequence. The {{Put}} that were successful during
> the first run will be *re-applied*. This will unexpectedly create extra
> versions. Now I realize most people don't really care about versions, so
> they won't notice. But whoever relies on the versions for whatever reason
> will rightfully consider this to be data corruption.
> As it is now, {{MultiPut}} makes proper error handling impossible. Since
> this RPC cannot guarantee any atomicity other than at the individual {{Put}}
> level, it should return to the client specific information about which
> {{Put}} failed in case of a failure, so that the client can do proper error
> handling.
> This requires us to change the {{MultiPutResponse}} so that it can indicate
> which {{Put}} specifically failed. We could do this for instance by giving
> the index of the {{Put}} along with its {{OperationStatusCode}}. So in the
> scenario above, the {{MultiPutResponse}} would essentially return something
> like: "for that particular region, put #0 failed, put #1 succeeded". If we
> want to save a bit of space, we may want to omit the successes from the
> response and only mention the failures - so a response that doesn't mention
> any failure means that everything was successful. Not sure whether that's a
> good idea though.
> Since doing this require an incompatible RPC change, I propose that we take
> the opportunity to rewrite the {{MultiPut}} RPC too. Right now it's very
> inefficient, it's just a hack on top of {{Put}}. When {{MultiPut}} is
> written to the wire, a lot of unnecessary duplicate data is sent out. The
> timestamp, the row key and the family are sent out to the wire N+1 times,
> where N is the number of edits for a particular row, instead of just once (!).
> Alternatively, if we don't want to change the RPCs, we can fix this issue in
> a backward compatible way by making the {{while}} loop in
> {{HRegion#put(Pair<Put, Integer>[])}} stop as soon as a failure is
> encountered.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.