brfrn169 commented on a change in pull request #2228:
URL: https://github.com/apache/hbase/pull/2228#discussion_r470435585
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
}
}
+ /**
+ * Do coprocessor pre-increment or pre-append call.
+ * @return Result returned out of the coprocessor, which means bypass all
further processing
+ * and return the proffered Result instead, or null which means proceed.
+ */
+ private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ Result result = null;
+ if (region.coprocessorHost != null) {
+ if (mutation instanceof Increment) {
+ result = region.coprocessorHost.preIncrementAfterRowLock((Increment)
mutation);
+ } else {
+ result = region.coprocessorHost.preAppendAfterRowLock((Append)
mutation);
+ }
+ }
+ return result;
+ }
+
+ private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell>
results)
+ throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ Map<byte[], List<Cell>> ret = new HashMap<>();
+ // Process a Store/family at a time.
+ for (Map.Entry<byte [], List<Cell>> entry:
mutation.getFamilyCellMap().entrySet()) {
+ final byte[] columnFamilyName = entry.getKey();
+ List<Cell> deltas = entry.getValue();
+ // Reckon for the Store what to apply to WAL and MemStore.
+ List<Cell> toApply =
reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+ now, deltas, results);
+ if (!toApply.isEmpty()) {
+ for (Cell cell : toApply) {
+ HStore store = region.getStore(cell);
+ if (store == null) {
+ region.checkFamily(CellUtil.cloneFamily(cell));
+ } else {
+ ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+ key -> new ArrayList<>()).add(cell);
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Reckon the Cells to apply to WAL, memstore, and to return to the Client
in passed
+ * column family/Store.
+ *
+ * Does Get of current value and then adds passed in deltas for this Store
returning the
+ * result.
+ *
+ * @param mutation The encompassing Mutation object
+ * @param deltas Changes to apply to this Store; either increment amount
or data to append
+ * @param results In here we accumulate all the Cells we are to return to
the client. If null,
+ * client doesn't want results returned.
+ * @return Resulting Cells after <code>deltas</code> have been applied to
current
+ * values. Side effect is our filling out of the <code>results</code>
List.
+ */
+ private List<Cell> reckonDeltasByStore(HStore store, Mutation mutation,
long now,
+ List<Cell> deltas, List<Cell> results) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+ List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
+
+ // Get previous values for all columns in this family.
+ TimeRange tr;
+ if (mutation instanceof Increment) {
+ tr = ((Increment) mutation).getTimeRange();
+ } else {
+ tr = ((Append) mutation).getTimeRange();
+ }
+ List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+ // Iterate the input columns and update existing values if they were
found, otherwise
+ // add new column initialized to the delta amount
+ int currentValuesIndex = 0;
+ for (int i = 0; i < deltas.size(); i++) {
+ Cell delta = deltas.get(i);
+ Cell currentValue = null;
+ if (currentValuesIndex < currentValues.size() &&
+ CellUtil.matchingQualifier(currentValues.get(currentValuesIndex),
delta)) {
+ currentValue = currentValues.get(currentValuesIndex);
+ if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta,
deltas.get(i + 1))) {
+ currentValuesIndex++;
+ }
+ }
+ // Switch on whether this an increment or an append building the new
Cell to apply.
+ Cell newCell;
+ if (mutation instanceof Increment) {
+ long deltaAmount = getLongValue(delta);
+ final long newValue = currentValue == null ?
+ deltaAmount : getLongValue(currentValue) + deltaAmount;
+ newCell = reckonDelta(delta, currentValue, columnFamily, now,
mutation,
+ (oldCell) -> Bytes.toBytes(newValue));
+ } else {
Review comment:
These were also moving existing code. Will keep this. Thanks.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
}
}
+ /**
+ * Do coprocessor pre-increment or pre-append call.
+ * @return Result returned out of the coprocessor, which means bypass all
further processing
+ * and return the proffered Result instead, or null which means proceed.
+ */
+ private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
Review comment:
Will change the name. This method is only for Increment and Append
operations. It's called in the following if statement only:
https://github.com/apache/hbase/pull/2228/files#diff-6205e907851ed4f650499f7111cbd91cR3799
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final
OperationQuota quota,
spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
Result r = null;
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().preIncrement(increment);
- }
- if (r == null) {
- boolean canProceed = startNonceOperation(mutation, nonceGroup);
- boolean success = false;
- try {
- long nonce = mutation.hasNonce() ? mutation.getNonce() :
HConstants.NO_NONCE;
- if (canProceed) {
- r = region.increment(increment, nonceGroup, nonce);
- } else {
+ boolean canProceed = startNonceOperation(mutation, nonceGroup);
Review comment:
Yes, I changed the semantics. This is because we call `preIncrement` in
`region.increment()` after this change. I don't think this change breaks
anything, but it just changes the order of `preIncrement` and the nonce
operation. What do you think?
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
##########
@@ -1282,6 +1438,80 @@ public void
testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
assertEquals("f", Bytes.toString(result.getValue(FAMILY,
Bytes.toBytes("F"))));
}
+ @Test
+ public void testCheckAndIncrementBatch() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+ // CheckAndIncrement with correct value
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+ // CheckAndIncrement with wrong value
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+ .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+ List<CheckAndMutateResult> results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1,
checkAndMutate2)).get();
Review comment:
No for now. I'm going to handle that case in
[HBASE-24210](https://issues.apache.org/jira/browse/HBASE-24210).
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final
OperationQuota quota,
spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
Result r = null;
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().preIncrement(increment);
- }
- if (r == null) {
- boolean canProceed = startNonceOperation(mutation, nonceGroup);
- boolean success = false;
- try {
- long nonce = mutation.hasNonce() ? mutation.getNonce() :
HConstants.NO_NONCE;
- if (canProceed) {
- r = region.increment(increment, nonceGroup, nonce);
- } else {
+ boolean canProceed = startNonceOperation(mutation, nonceGroup);
Review comment:
Yes, I've changed the semantics. This is because we call `preIncrement`
in `region.increment()` after this change. I don't think this change breaks
anything, but it just changes the order of `preIncrement` and the nonce
operation. What do you think?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2910,23 +2908,35 @@ public MultiResponse multi(final RpcController rpcc,
final MultiRequest request)
}
try {
- CheckAndMutateResult result = checkAndMutate(region,
regionAction.getActionList(),
- cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
- regionActionResultBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
- for (int i = 0; i < regionAction.getActionCount(); i++) {
- if (i == 0 && result.getResult() != null) {
- resultOrExceptionOrBuilder.setIndex(i);
-
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
-
.setResult(ProtobufUtil.toResult(result.getResult())).build());
- continue;
+ if (regionAction.getActionCount() == 1) {
+ CheckAndMutateResult result = checkAndMutate(region, quota,
+ regionAction.getAction(0).getMutation(), cellScanner,
+ regionAction.getCondition(), spaceQuotaEnforcement);
+ regionActionResultBuilder.setProcessed(result.isSuccess());
+ resultOrExceptionOrBuilder.setIndex(0);
+ if (result.getResult() != null) {
+
resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
Review comment:
Actually, this change supports a CheckAndMutate operation only with a
single Increment/Append. That's why I needed the if statement to check whether
a single action or not. BTW, I'm going to handle CheckAndMutate operations with
multiple Increments/Appends in
[HBASE-24210](https://issues.apache.org/jira/browse/HBASE-24210).
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
}
}
+ /**
+ * Do coprocessor pre-increment or pre-append call.
+ * @return Result returned out of the coprocessor, which means bypass all
further processing
+ * and return the proffered Result instead, or null which means proceed.
+ */
+ private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ Result result = null;
+ if (region.coprocessorHost != null) {
+ if (mutation instanceof Increment) {
+ result = region.coprocessorHost.preIncrementAfterRowLock((Increment)
mutation);
+ } else {
+ result = region.coprocessorHost.preAppendAfterRowLock((Append)
mutation);
+ }
+ }
+ return result;
+ }
+
+ private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell>
results)
+ throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ Map<byte[], List<Cell>> ret = new HashMap<>();
+ // Process a Store/family at a time.
+ for (Map.Entry<byte [], List<Cell>> entry:
mutation.getFamilyCellMap().entrySet()) {
+ final byte[] columnFamilyName = entry.getKey();
+ List<Cell> deltas = entry.getValue();
+ // Reckon for the Store what to apply to WAL and MemStore.
+ List<Cell> toApply =
reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+ now, deltas, results);
+ if (!toApply.isEmpty()) {
+ for (Cell cell : toApply) {
Review comment:
These were also moving existing code. Will keep this. Thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]