[ 
https://issues.apache.org/jira/browse/HBASE-2898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benoit Sigoure updated HBASE-2898:
----------------------------------

    Description: 
The {{MultiPut}} RPC needs to be completely rewritten.  Let's see why step by 
step.

# An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
# Eventually, {{HTable#flushCommits}} is invoked to actually send the edits to 
the RegionServer(s).
# This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits 
are sorted into one or more {{MultiPut}}.  Each {{MultiPut}} is aggregating all 
the edits that are going to a particular RegionServer.
# A thread pool is used to send all the {{MultiPut}} in parallel to their 
respective RegionServer.  Let's follow what happens for a single {{MultiPut}}.
# The {{MultiPut}} travels through the IPC code on the client and then through 
the network and then through the IPC code on the RegionServer.
# We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is 
created.
# Still in {{HRegionServer#multiPut}}.  Since a {{MultiPut}} is essentially a 
map from region name to a list of {{Put}} for that region, there's a {{for}} 
loop that executes each list of {{Put}} for each region sequentially.  Let's 
follow what happens for a single list of {{Put}} for a particular region.
# We're now in {{HRegionServer#put(byte[], List<Put>)}}.  Each {{Put}} is 
associated with the row lock that was specified by the client (if any).  Then 
the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
# Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes 
us to {{HRegion#doMiniBatchPut}}.
# At this point, let's assume that we're doing just 2 edits.  So the 
{{BatchOperationInProgress}} that {{doMiniBatchPut}} contains just 2 {{Put}}.
# The {{while}} loop in {{doMiniBatchPut}} that's going to execute each {{Put}} 
starts.
# The first {{Put}} fails because an exception is thrown when appending the 
edit to the {{WAL}}.  Its {{batchOp.retCodes}} is marked as 
{{OperationStatusCode.FAILURE}}.
# Because there was an exception, we're back to {{HRegion#put(Pair<Put, 
Integer>[])}} where the {{while}} loop will test that {{batchOp.isDone}} is 
{{false}} and do another iteration.
# {{doMiniBatchPut}} is called again and handles the remaining {{Put}}.
# The second {{Put}} succeeds normally, so its {{batchOp.retCodes}} is marked 
as {{OperationStatusCode.SUCCESS}}.
# {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put, 
Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
# At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop 
and extracts the index of the *first* {{Put}} that failed out of the 
{{OperationStatusCode[]}}.  In our case, it'll return 0 since the first {{Put}} 
failed.
# This index in the list of {{Put}} of the first that failed (0 in this case) 
is returned to {{HRegionServer#multiPut}}, which records in the 
{{MultiPutResponse}} - the client knows that the first {{Put}} failed but has 
no idea about the other one.

So the client has no reliable way of knowing which {{Put}} failed (if any) past 
the first failure.  All it knows is that for a particular region, they 
succeeded up to a particular {{Put}}, at which point there was a failure, and 
then the remaining may or may not have succeeded.  Its best bet is to retry all 
the {{Put}} past the index of the first failure for this region.  But this has 
an unintended consequence.  The {{Put}} that were successful during the first 
run will be *re-applied*.  This will unexpectedly create extra versions.  Now I 
realize most people don't really care about versions, so they won't notice.  
But whoever relies on the versions for whatever reason will rightfully consider 
this to be data corruption.

As it is now, {{MultiPut}} makes proper error handling impossible.  Since this 
RPC cannot guarantee any atomicity other than at the individual {{Put}} level, 
it should return to the client specific information about which {{Put}} failed 
in case of a failure, so that the client can do proper error handling.

This requires us to change the {{MultiPutResponse}} so that it can indicate 
which {{Put}} specifically failed.  We could do this for instance by giving the 
index of the {{Put}} along with its {{OperationStatusCode}}.  So in the 
scenario above, the {{MultiPutResponse}} would essentially return something 
like: "for that particular region, put #0 failed, put #1 succeeded".  If we 
want to save a bit of space, we may want to omit the successes from the 
response and only mention the failures - so a response that doesn't mention any 
failure means that everything was successful.  Not sure whether that's a good 
idea though.

Since doing this require an incompatible RPC change, I propose that we take the 
opportunity to rewrite the {{MultiPut}} RPC too.  Right now it's very 
inefficient, it's just a hack on top of {{Put}}.  When {{MultiPut}} is written 
to the wire, a lot of unnecessary duplicate data is sent out.  The timestamp, 
the row key and the family are sent out to the wire N+1 times, where N is the 
number of edits for a particular row, instead of just once (!).

Alternatively, if we don't want to change the RPCs, we can fix this issue in a 
backward compatible way by making the {{while}} loop in {{HRegion#put(Pair<Put, 
Integer>[])}} stop as soon as a failure is encountered.

  was:
The {{MultiPut}} RPC needs to be completely rewritten.  Let's see why step by 
step.

# An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
# Eventually, {{HTable#flushCommits}} is invoked to actually send the edits to 
the RegionServer(s).
# This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits 
are sorted into one or more {{MultiPut}}.  Each {{MultiPut}} is aggregating all 
the edits that are going to a particular RegionServer.
# A thread pool is used to send all the {{MultiPut}} in parallel to their 
respective RegionServer.  Let's follow what happens for a single {{MultiPut}}.
# The {{MultiPut}} travels through the IPC code on the client and then through 
the network and then through the IPC code on the RegionServer.
# We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is 
created.
# Still in {{HRegionServer#multiPut}}.  Since a {{MultiPut}} is essentially a 
map from region name to a list of {{Put}} for that region, there's a {{for}} 
loop that executes each list of {{Put}} for each region sequentially.  Let's 
follow what happens for a single list of {{Put}} for a particular region.
# We're now in {{HRegionServer#put(byte[], List<Put>)}}.  Each {{Put}} is 
associated with the row lock that was specified by the client (if any).  Then 
the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
# Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes 
us to {{HRegion#doMiniBatchPut}}.
# At this point, let's assume that we're doing just 2 edits.  So the 
{{BatchOperationInProgress}} that {{doMiniBatchPut}} has contains just 2 
{{Put}}.
# The {{while}} loop in {{doMiniBatchPut}} that's going to execute each {{Put}} 
starts.
# The first {{Put}} succeeds normally, so at the end of the first iteration, 
its {{batchOp.retCodes}} is marked as {{OperationStatusCode.SUCCESS}}.
# The second {{Put}} fails because an exception is thrown when appending the 
edit to the {{WAL}}.  Its {{batchOp.retCodes}} is marked as 
{{OperationStatusCode.FAILURE}}.
# {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put, 
Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
# At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop 
and extracts the *first* failure out of the {{OperationStatusCode[]}}.  In our 
case, it'll take the {{OperationStatusCode.FAILURE}} that came from the second 
{{Put}}.
# This error code is returned to {{HRegionServer#multiPut}}, which records in 
the {{MultiPutResponse}} that there was a failure - but *we don't know for 
which {{Put}}*

So the client has no way of knowing which {{Put}} failed.  All it knows is that 
at least one of them failed for a particular region.  Its best bet is to retry 
all the {{Put}} for this region.  But this has an unintended consequence.  The 
{{Put}} that were successful during the first run will be *re-applied*.  This 
will unexpectedly create extra versions.  Now I realize most people don't 
really care about versions, so they won't notice.  But whoever relies on the 
versions for whatever reason will rightfully consider this to be data 
corruption.

{{MultiPut}} makes proper error handling impossible.  Since this RPC cannot 
guarantee any atomicity other than at the individual {{Put}} level, it *must* 
return to the client specific information about which {{Put}} failed in case of 
a failure, so that the client can do proper error handling.

This requires us to change the {{MultiPutResponse}} so that it can indicate 
which {{Put}} specifically failed.  We could do this for instance by giving the 
index of the {{Put}} along with its {{OperationStatusCode}}.  So in the 
scenario above, the {{MultiPutResponse}} would essentially return something 
like: "for that particular region, put #0 succeeded, put #1 failed".  If we 
want to save a bit of space, we may want to omit the successes from the 
response and only mention the failures - so a response that doesn't mention any 
failure means that everything was successful.  Not sure whether that's a good 
idea though.

Since doing this require an incompatible RPC change, I propose that we take the 
opportunity to rewrite the {{MultiPut}} RPC too.  Right now it's very 
inefficient, it's just a hack on top of {{Put}}.  When {{MultiPut}} is written 
to the wire, a lot of unnecessary duplicate data is sent out.  The timestamp, 
the row key and the family are sent out to the wire N+1 times, where N is the 
number of edits for a particular row, instead of just once (!).


I adjusted the description of the issue as it was inaccurate at some point, as 
Ryan pointed out.  The issue remains valid though.

> MultiPut makes proper error handling impossible and leads to corrupted data
> ---------------------------------------------------------------------------
>
>                 Key: HBASE-2898
>                 URL: https://issues.apache.org/jira/browse/HBASE-2898
>             Project: HBase
>          Issue Type: Bug
>          Components: client, regionserver
>    Affects Versions: 0.89.20100621
>            Reporter: Benoit Sigoure
>            Priority: Blocker
>             Fix For: 0.90.0
>
>
> The {{MultiPut}} RPC needs to be completely rewritten.  Let's see why step by 
> step.
> # An HBase user calls any of the {{put}} methods on an {{HTable}} instance.
> # Eventually, {{HTable#flushCommits}} is invoked to actually send the edits 
> to the RegionServer(s).
> # This takes us to {{HConnectionManager#processBatchOfPuts}} where all edits 
> are sorted into one or more {{MultiPut}}.  Each {{MultiPut}} is aggregating 
> all the edits that are going to a particular RegionServer.
> # A thread pool is used to send all the {{MultiPut}} in parallel to their 
> respective RegionServer.  Let's follow what happens for a single {{MultiPut}}.
> # The {{MultiPut}} travels through the IPC code on the client and then 
> through the network and then through the IPC code on the RegionServer.
> # We're now in {{HRegionServer#multiPut}} where a new {{MultiPutResponse}} is 
> created.
> # Still in {{HRegionServer#multiPut}}.  Since a {{MultiPut}} is essentially a 
> map from region name to a list of {{Put}} for that region, there's a {{for}} 
> loop that executes each list of {{Put}} for each region sequentially.  Let's 
> follow what happens for a single list of {{Put}} for a particular region.
> # We're now in {{HRegionServer#put(byte[], List<Put>)}}.  Each {{Put}} is 
> associated with the row lock that was specified by the client (if any).  Then 
> the pairs of {{(Put, lock id)}} are handed to the right {{HRegion}}.
> # Now we're in {{HRegion#put(Pair<Put, Integer>[])}}, which immediately takes 
> us to {{HRegion#doMiniBatchPut}}.
> # At this point, let's assume that we're doing just 2 edits.  So the 
> {{BatchOperationInProgress}} that {{doMiniBatchPut}} contains just 2 {{Put}}.
> # The {{while}} loop in {{doMiniBatchPut}} that's going to execute each 
> {{Put}} starts.
> # The first {{Put}} fails because an exception is thrown when appending the 
> edit to the {{WAL}}.  Its {{batchOp.retCodes}} is marked as 
> {{OperationStatusCode.FAILURE}}.
> # Because there was an exception, we're back to {{HRegion#put(Pair<Put, 
> Integer>[])}} where the {{while}} loop will test that {{batchOp.isDone}} is 
> {{false}} and do another iteration.
> # {{doMiniBatchPut}} is called again and handles the remaining {{Put}}.
> # The second {{Put}} succeeds normally, so its {{batchOp.retCodes}} is marked 
> as {{OperationStatusCode.SUCCESS}}.
> # {{doMiniBatchPut}} is done and returns to {{HRegion#put(Pair<Put, 
> Integer>[])}}, which returns to {{HRegionServer#put(byte[], List<Put>)}}.
> # At this point, {{HRegionServer#put(byte[], List<Put>)}} does a {{for}} loop 
> and extracts the index of the *first* {{Put}} that failed out of the 
> {{OperationStatusCode[]}}.  In our case, it'll return 0 since the first 
> {{Put}} failed.
> # This index in the list of {{Put}} of the first that failed (0 in this case) 
> is returned to {{HRegionServer#multiPut}}, which records in the 
> {{MultiPutResponse}} - the client knows that the first {{Put}} failed but has 
> no idea about the other one.
> So the client has no reliable way of knowing which {{Put}} failed (if any) 
> past the first failure.  All it knows is that for a particular region, they 
> succeeded up to a particular {{Put}}, at which point there was a failure, and 
> then the remaining may or may not have succeeded.  Its best bet is to retry 
> all the {{Put}} past the index of the first failure for this region.  But 
> this has an unintended consequence.  The {{Put}} that were successful during 
> the first run will be *re-applied*.  This will unexpectedly create extra 
> versions.  Now I realize most people don't really care about versions, so 
> they won't notice.  But whoever relies on the versions for whatever reason 
> will rightfully consider this to be data corruption.
> As it is now, {{MultiPut}} makes proper error handling impossible.  Since 
> this RPC cannot guarantee any atomicity other than at the individual {{Put}} 
> level, it should return to the client specific information about which 
> {{Put}} failed in case of a failure, so that the client can do proper error 
> handling.
> This requires us to change the {{MultiPutResponse}} so that it can indicate 
> which {{Put}} specifically failed.  We could do this for instance by giving 
> the index of the {{Put}} along with its {{OperationStatusCode}}.  So in the 
> scenario above, the {{MultiPutResponse}} would essentially return something 
> like: "for that particular region, put #0 failed, put #1 succeeded".  If we 
> want to save a bit of space, we may want to omit the successes from the 
> response and only mention the failures - so a response that doesn't mention 
> any failure means that everything was successful.  Not sure whether that's a 
> good idea though.
> Since doing this require an incompatible RPC change, I propose that we take 
> the opportunity to rewrite the {{MultiPut}} RPC too.  Right now it's very 
> inefficient, it's just a hack on top of {{Put}}.  When {{MultiPut}} is 
> written to the wire, a lot of unnecessary duplicate data is sent out.  The 
> timestamp, the row key and the family are sent out to the wire N+1 times, 
> where N is the number of edits for a particular row, instead of just once (!).
> Alternatively, if we don't want to change the RPCs, we can fix this issue in 
> a backward compatible way by making the {{while}} loop in 
> {{HRegion#put(Pair<Put, Integer>[])}} stop as soon as a failure is 
> encountered.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to