Repository: hbase Updated Branches: refs/heads/master 3b444a066 -> 25ee5f7f8
HBASE-18546 Always overwrite the TS for Append/Increment unless no existing cells are found Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25ee5f7f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25ee5f7f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25ee5f7f Branch: refs/heads/master Commit: 25ee5f7f8406b358aa0e7ac59ed661fef82183b8 Parents: 3b444a0 Author: Chia-Ping Tsai <[email protected]> Authored: Thu Aug 24 14:23:36 2017 +0800 Committer: Chia-Ping Tsai <[email protected]> Committed: Thu Aug 24 14:35:22 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/protobuf/ProtobufUtil.java | 201 ++++++------------- .../hbase/shaded/protobuf/ProtobufUtil.java | 198 ++++++------------ .../hbase/shaded/protobuf/RequestConverter.java | 10 +- .../hadoop/hbase/regionserver/HRegion.java | 99 +++------ .../hbase/client/TestAppendFromClientSide.java | 85 ++++++++ .../client/TestIncrementsFromClientSide.java | 23 +++ .../hadoop/hbase/protobuf/TestProtobufUtil.java | 8 +- .../hbase/shaded/protobuf/TestProtobufUtil.java | 86 ++++++++ 8 files changed, 354 insertions(+), 356 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 01ba0e0..79a874e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -616,83 +617,75 @@ public final class ProtobufUtil { return delete; } - /** - * Convert a protocol buffer Mutate to an Append - * @param cellScanner - * @param proto the protocol buffer Mutate to convert - * @return the converted client Append - * @throws IOException - */ - public static Append toAppend(final MutationProto proto, final CellScanner cellScanner) - throws IOException { - MutationType type = proto.getMutateType(); - assert type == MutationType.APPEND : type.name(); - byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; - Append append = null; - int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; + @FunctionalInterface + private interface ConsumerWithException <T, U> { + void accept(T t, U u) throws IOException; + } + + private static <T extends Mutation> T toDelta(Function<Bytes, T> supplier, ConsumerWithException<T, Cell> consumer, + final MutationProto proto, final CellScanner cellScanner) throws IOException { + byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null; + T mutation = row == null ? null : supplier.apply(new Bytes(row)); + int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0; if (cellCount > 0) { // The proto has metadata only and the data is separate to be found in the cellScanner. if (cellScanner == null) { throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " + - toShortString(proto)); + toShortString(proto)); } for (int i = 0; i < cellCount; i++) { if (!cellScanner.advance()) { throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i + - " no cell returned: " + toShortString(proto)); + " no cell returned: " + toShortString(proto)); } Cell cell = cellScanner.current(); - if (append == null) { - append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + if (mutation == null) { + mutation = supplier.apply(new Bytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); } - append.add(cell); + consumer.accept(mutation, cell); } } else { - append = new Append(row); - for (ColumnValue column: proto.getColumnValueList()) { + if (mutation == null) { + throw new IllegalArgumentException("row cannot be null"); + } + for (ColumnValue column : proto.getColumnValueList()) { byte[] family = column.getFamily().toByteArray(); - for (QualifierValue qv: column.getQualifierValueList()) { + for (QualifierValue qv : column.getQualifierValueList()) { byte[] qualifier = qv.getQualifier().toByteArray(); if (!qv.hasValue()) { throw new DoNotRetryIOException( - "Missing required field: qualifier value"); + "Missing required field: qualifier value"); } byte[] value = qv.getValue().toByteArray(); byte[] tags = null; if (qv.hasTags()) { tags = qv.getTags().toByteArray(); } - append.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(), - KeyValue.Type.Put, value, tags)); + consumer.accept(mutation, CellUtil.createCell(mutation.getRow(), family, qualifier, qv.getTimestamp(), + KeyValue.Type.Put, value, tags)); } } } - append.setDurability(toDurability(proto.getDurability())); - for (NameBytesPair attribute: proto.getAttributeList()) { - append.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); + mutation.setDurability(toDurability(proto.getDurability())); + for (NameBytesPair attribute : proto.getAttributeList()) { + mutation.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); } - return append; + return mutation; } /** - * Convert a MutateRequest to Mutation - * + * Convert a protocol buffer Mutate to an Append + * @param cellScanner * @param proto the protocol buffer Mutate to convert - * @return the converted Mutation + * @return the converted client Append * @throws IOException */ - public static Mutation toMutation(final MutationProto proto) throws IOException { + public static Append toAppend(final MutationProto proto, final CellScanner cellScanner) + throws IOException { MutationType type = proto.getMutateType(); - if (type == MutationType.APPEND) { - return toAppend(proto, null); - } - if (type == MutationType.DELETE) { - return toDelete(proto, null); - } - if (type == MutationType.PUT) { - return toPut(proto, null); - } - throw new IOException("Unknown mutation type " + type); + assert type == MutationType.APPEND : type.name(); + return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), + Append::add, proto, cellScanner); } /** @@ -703,60 +696,40 @@ public final class ProtobufUtil { * @throws IOException */ public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner) - throws IOException { + throws IOException { MutationType type = proto.getMutateType(); assert type == MutationType.INCREMENT : type.name(); - byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; - Increment increment = null; - int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; - if (cellCount > 0) { - // The proto has metadata only and the data is separate to be found in the cellScanner. - if (cellScanner == null) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " + - TextFormat.shortDebugString(proto)); - } - for (int i = 0; i < cellCount; i++) { - if (!cellScanner.advance()) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i + - " no cell returned: " + TextFormat.shortDebugString(proto)); - } - Cell cell = cellScanner.current(); - if (increment == null) { - increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - } - increment.add(cell); - } - } else { - increment = new Increment(row); - for (ColumnValue column: proto.getColumnValueList()) { - byte[] family = column.getFamily().toByteArray(); - for (QualifierValue qv: column.getQualifierValueList()) { - byte[] qualifier = qv.getQualifier().toByteArray(); - if (!qv.hasValue()) { - throw new DoNotRetryIOException("Missing required field: qualifier value"); - } - byte[] value = qv.getValue().toByteArray(); - byte[] tags = null; - if (qv.hasTags()) { - tags = qv.getTags().toByteArray(); - } - increment.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(), - KeyValue.Type.Put, value, tags)); - } - } - } + Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()), + Increment::add, proto, cellScanner); if (proto.hasTimeRange()) { TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); increment.setTimeRange(timeRange.getMin(), timeRange.getMax()); } - increment.setDurability(toDurability(proto.getDurability())); - for (NameBytesPair attribute : proto.getAttributeList()) { - increment.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); - } return increment; } /** + * Convert a MutateRequest to Mutation + * + * @param proto the protocol buffer Mutate to convert + * @return the converted Mutation + * @throws IOException + */ + public static Mutation toMutation(final MutationProto proto) throws IOException { + MutationType type = proto.getMutateType(); + if (type == MutationType.APPEND) { + return toAppend(proto, null); + } + if (type == MutationType.DELETE) { + return toDelete(proto, null); + } + if (type == MutationType.PUT) { + return toPut(proto, null); + } + throw new IOException("Unknown mutation type " + type); + } + + /** * Convert a protocol buffer Mutate to a Get. * @param proto the protocol buffer Mutate to convert. * @param cellScanner @@ -1137,56 +1110,6 @@ public final class ProtobufUtil { } } - /** - * Convert a client Increment to a protobuf Mutate. - * - * @param increment - * @return the converted mutate - */ - public static MutationProto toMutation( - final Increment increment, final MutationProto.Builder builder, long nonce) { - builder.setRow(ByteStringer.wrap(increment.getRow())); - builder.setMutateType(MutationType.INCREMENT); - builder.setDurability(toDurability(increment.getDurability())); - if (nonce != HConstants.NO_NONCE) { - builder.setNonce(nonce); - } - TimeRange timeRange = increment.getTimeRange(); - setTimeRange(builder, timeRange); - ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); - QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); - for (Map.Entry<byte[], List<Cell>> family: increment.getFamilyCellMap().entrySet()) { - columnBuilder.setFamily(ByteStringer.wrap(family.getKey())); - columnBuilder.clearQualifierValue(); - List<Cell> values = family.getValue(); - if (values != null && values.size() > 0) { - for (Cell cell: values) { - valueBuilder.clear(); - valueBuilder.setQualifier(ByteStringer.wrap( - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); - valueBuilder.setValue(ByteStringer.wrap( - cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); - if (cell.getTagsLength() > 0) { - valueBuilder.setTags(ByteStringer.wrap(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength())); - } - columnBuilder.addQualifierValue(valueBuilder.build()); - } - } - builder.addColumnValue(columnBuilder.build()); - } - Map<String, byte[]> attributes = increment.getAttributesMap(); - if (!attributes.isEmpty()) { - NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); - for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) { - attributeBuilder.setName(attribute.getKey()); - attributeBuilder.setValue(ByteStringer.wrap(attribute.getValue())); - builder.addAttribute(attributeBuilder.build()); - } - } - return builder.build(); - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1217,6 +1140,10 @@ public final class ProtobufUtil { if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } + if (type == MutationType.INCREMENT) { + TimeRange timeRange = ((Increment) mutation).getTimeRange(); + setTimeRange(builder, timeRange); + } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index abcc5e2..739526e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -34,6 +34,7 @@ import java.util.NavigableSet; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -741,86 +742,75 @@ public final class ProtobufUtil { } return delete; } + @FunctionalInterface + private interface ConsumerWithException <T, U> { + void accept(T t, U u) throws IOException; + } - /** - * Convert a protocol buffer Mutate to an Append - * @param cellScanner - * @param proto the protocol buffer Mutate to convert - * @return the converted client Append - * @throws IOException - */ - public static Append toAppend(final MutationProto proto, final CellScanner cellScanner) - throws IOException { - MutationType type = proto.getMutateType(); - assert type == MutationType.APPEND : type.name(); + private static <T extends Mutation> T toDelta(Function<Bytes, T> supplier, ConsumerWithException<T, Cell> consumer, + final MutationProto proto, final CellScanner cellScanner) throws IOException { byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null; - Append append = row != null ? new Append(row) : null; - int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; + T mutation = row == null ? null : supplier.apply(new Bytes(row)); + int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0; if (cellCount > 0) { // The proto has metadata only and the data is separate to be found in the cellScanner. if (cellScanner == null) { throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " + - toShortString(proto)); + toShortString(proto)); } for (int i = 0; i < cellCount; i++) { if (!cellScanner.advance()) { throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i + - " no cell returned: " + toShortString(proto)); + " no cell returned: " + toShortString(proto)); } Cell cell = cellScanner.current(); - if (append == null) { - append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + if (mutation == null) { + mutation = supplier.apply(new Bytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); } - append.add(cell); + consumer.accept(mutation, cell); } } else { - if (append == null) { + if (mutation == null) { throw new IllegalArgumentException("row cannot be null"); } - for (ColumnValue column: proto.getColumnValueList()) { + for (ColumnValue column : proto.getColumnValueList()) { byte[] family = column.getFamily().toByteArray(); - for (QualifierValue qv: column.getQualifierValueList()) { + for (QualifierValue qv : column.getQualifierValueList()) { byte[] qualifier = qv.getQualifier().toByteArray(); if (!qv.hasValue()) { throw new DoNotRetryIOException( - "Missing required field: qualifier value"); + "Missing required field: qualifier value"); } byte[] value = qv.getValue().toByteArray(); byte[] tags = null; if (qv.hasTags()) { tags = qv.getTags().toByteArray(); } - append.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(), - KeyValue.Type.Put, value, tags)); + consumer.accept(mutation, CellUtil.createCell(mutation.getRow(), family, qualifier, qv.getTimestamp(), + KeyValue.Type.Put, value, tags)); } } } - append.setDurability(toDurability(proto.getDurability())); - for (NameBytesPair attribute: proto.getAttributeList()) { - append.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); + mutation.setDurability(toDurability(proto.getDurability())); + for (NameBytesPair attribute : proto.getAttributeList()) { + mutation.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); } - return append; + return mutation; } /** - * Convert a MutateRequest to Mutation - * + * Convert a protocol buffer Mutate to an Append + * @param cellScanner * @param proto the protocol buffer Mutate to convert - * @return the converted Mutation + * @return the converted client Append * @throws IOException */ - public static Mutation toMutation(final MutationProto proto) throws IOException { + public static Append toAppend(final MutationProto proto, final CellScanner cellScanner) + throws IOException { MutationType type = proto.getMutateType(); - if (type == MutationType.APPEND) { - return toAppend(proto, null); - } - if (type == MutationType.DELETE) { - return toDelete(proto, null); - } - if (type == MutationType.PUT) { - return toPut(proto, null); - } - throw new IOException("Unknown mutation type " + type); + assert type == MutationType.APPEND : type.name(); + return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), + Append::add, proto, cellScanner); } /** @@ -831,62 +821,40 @@ public final class ProtobufUtil { * @throws IOException */ public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner) - throws IOException { + throws IOException { MutationType type = proto.getMutateType(); assert type == MutationType.INCREMENT : type.name(); - byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; - Increment increment = row != null ? new Increment(row) : null; - int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; - if (cellCount > 0) { - // The proto has metadata only and the data is separate to be found in the cellScanner. - if (cellScanner == null) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " + - TextFormat.shortDebugString(proto)); - } - for (int i = 0; i < cellCount; i++) { - if (!cellScanner.advance()) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i + - " no cell returned: " + TextFormat.shortDebugString(proto)); - } - Cell cell = cellScanner.current(); - if (increment == null) { - increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - } - increment.add(cell); - } - } else { - if (increment == null) { - throw new IllegalArgumentException("row cannot be null"); - } - for (ColumnValue column: proto.getColumnValueList()) { - byte[] family = column.getFamily().toByteArray(); - for (QualifierValue qv: column.getQualifierValueList()) { - byte[] qualifier = qv.getQualifier().toByteArray(); - if (!qv.hasValue()) { - throw new DoNotRetryIOException("Missing required field: qualifier value"); - } - byte[] value = qv.getValue().toByteArray(); - byte[] tags = null; - if (qv.hasTags()) { - tags = qv.getTags().toByteArray(); - } - increment.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(), - KeyValue.Type.Put, value, tags)); - } - } - } + Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()), + Increment::add, proto, cellScanner); if (proto.hasTimeRange()) { TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); increment.setTimeRange(timeRange.getMin(), timeRange.getMax()); } - increment.setDurability(toDurability(proto.getDurability())); - for (NameBytesPair attribute : proto.getAttributeList()) { - increment.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); - } return increment; } /** + * Convert a MutateRequest to Mutation + * + * @param proto the protocol buffer Mutate to convert + * @return the converted Mutation + * @throws IOException + */ + public static Mutation toMutation(final MutationProto proto) throws IOException { + MutationType type = proto.getMutateType(); + if (type == MutationType.APPEND) { + return toAppend(proto, null); + } + if (type == MutationType.DELETE) { + return toDelete(proto, null); + } + if (type == MutationType.PUT) { + return toPut(proto, null); + } + throw new IOException("Unknown mutation type " + type); + } + + /** * Convert a protocol buffer Mutate to a Get. * @param proto the protocol buffer Mutate to convert. * @param cellScanner @@ -1290,56 +1258,6 @@ public final class ProtobufUtil { } } - /** - * Convert a client Increment to a protobuf Mutate. - * - * @param increment - * @return the converted mutate - */ - public static MutationProto toMutation( - final Increment increment, final MutationProto.Builder builder, long nonce) { - builder.setRow(UnsafeByteOperations.unsafeWrap(increment.getRow())); - builder.setMutateType(MutationType.INCREMENT); - builder.setDurability(toDurability(increment.getDurability())); - if (nonce != HConstants.NO_NONCE) { - builder.setNonce(nonce); - } - TimeRange timeRange = increment.getTimeRange(); - setTimeRange(builder, timeRange); - ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); - QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); - for (Map.Entry<byte[], List<Cell>> family: increment.getFamilyCellMap().entrySet()) { - columnBuilder.setFamily(UnsafeByteOperations.unsafeWrap(family.getKey())); - columnBuilder.clearQualifierValue(); - List<Cell> values = family.getValue(); - if (values != null && values.size() > 0) { - for (Cell cell: values) { - valueBuilder.clear(); - valueBuilder.setQualifier(UnsafeByteOperations.unsafeWrap( - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); - valueBuilder.setValue(UnsafeByteOperations.unsafeWrap( - cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); - if (cell.getTagsLength() > 0) { - valueBuilder.setTags(UnsafeByteOperations.unsafeWrap(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength())); - } - columnBuilder.addQualifierValue(valueBuilder.build()); - } - } - builder.addColumnValue(columnBuilder.build()); - } - Map<String, byte[]> attributes = increment.getAttributesMap(); - if (!attributes.isEmpty()) { - NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); - for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) { - attributeBuilder.setName(attribute.getKey()); - attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); - builder.addAttribute(attributeBuilder.build()); - } - } - return builder.build(); - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1370,6 +1288,10 @@ public final class ProtobufUtil { if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } + if (type == MutationType.INCREMENT) { + TimeRange timeRange = ((Increment) mutation).getTimeRange(); + setTimeRange(builder, timeRange); + } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index a8a56c7..e620a91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -201,6 +201,7 @@ public final class RequestConverter { valueBuilder.setValue(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(amount))); valueBuilder.setQualifier(UnsafeByteOperations .unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)); + valueBuilder.setTimestamp(HConstants.LATEST_TIMESTAMP); columnBuilder.addQualifierValue(valueBuilder.build()); mutateBuilder.addColumnValue(columnBuilder.build()); if (nonce != HConstants.NO_NONCE) { @@ -364,7 +365,7 @@ public final class RequestConverter { * @return a mutate request */ public static MutateRequest buildMutateRequest(final byte[] regionName, - final Increment increment, final long nonceGroup, final long nonce) { + final Increment increment, final long nonceGroup, final long nonce) throws IOException { MutateRequest.Builder builder = MutateRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -372,7 +373,8 @@ public final class RequestConverter { if (nonce != HConstants.NO_NONCE && nonceGroup != HConstants.NO_NONCE) { builder.setNonceGroup(nonceGroup); } - builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder(), nonce)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, increment, + MutationProto.newBuilder(), nonce)); return builder.build(); } @@ -649,8 +651,8 @@ public final class RequestConverter { regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce()))); } else if (row instanceof Increment) { - regionActionBuilder.addAction(actionBuilder.setMutation( - ProtobufUtil.toMutation((Increment)row, mutationBuilder, action.getNonce()))); + regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( + MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce()))); } else if (row instanceof RegionCoprocessorServiceExec) { RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row; // DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString. http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b9cafd9..bc4baaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -32,6 +32,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; import java.text.ParseException; import java.util.AbstractList; import java.util.ArrayList; @@ -73,6 +74,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -173,6 +175,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -7499,14 +7502,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi mutationType = MutationType.INCREMENT; // If delta amount to apply is 0, don't write WAL or MemStore. long deltaAmount = getLongValue(delta); + // TODO: Does zero value mean reset Cell? For example, the ttl. apply = deltaAmount != 0; - newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now, - (Increment)mutation); + final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; + newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); break; case APPEND: mutationType = MutationType.APPEND; // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to. - newCell = reckonAppend(delta, currentValue, now, (Append)mutation); + newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> + ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) + .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) + .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength()) + .array() + ); break; default: throw new UnsupportedOperationException(op.toString()); } @@ -7528,82 +7537,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return toApply; } - /** - * Calculate new Increment Cell. - * @return New Increment Cell with delta applied to currentValue if currentValue is not null; - * otherwise, a new Cell with the delta set as its value. - */ - private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue, - byte [] columnFamily, final long now, Mutation mutation) - throws IOException { + private static Cell reckonDelta(final Cell delta, final Cell currentCell, + final byte[] columnFamily, final long now, + Mutation mutation, Function<Cell, byte[]> supplier) throws IOException { // Forward any tags found on the delta. List<Tag> tags = TagUtil.carryForwardTags(delta); - long newValue = deltaAmount; - long ts = now; - if (currentValue != null) { - tags = TagUtil.carryForwardTags(tags, currentValue); - ts = Math.max(now, currentValue.getTimestamp() + 1); - newValue += getLongValue(currentValue); - } - // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made... - // doesn't work well with offheaping or if we are doing a different Cell type. - byte [] incrementAmountInBytes = Bytes.toBytes(newValue); tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); - byte [] row = mutation.getRow(); - return new KeyValue(row, 0, row.length, - columnFamily, 0, columnFamily.length, - delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(), - ts, KeyValue.Type.Put, - incrementAmountInBytes, 0, incrementAmountInBytes.length, - tags); - } - - private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now, - Append mutation) - throws IOException { - // Forward any tags found on the delta. - List<Tag> tags = TagUtil.carryForwardTags(delta); - long ts = now; - Cell newCell = null; - byte [] row = mutation.getRow(); - if (currentValue != null) { - tags = TagUtil.carryForwardTags(tags, currentValue); - ts = Math.max(now, currentValue.getTimestamp() + 1); - tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); - byte[] tagBytes = TagUtil.fromList(tags); - // Allocate an empty cell and copy in all parts. - // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing - // other Cell types. Copying on-heap too if an off-heap Cell. - newCell = new KeyValue(row.length, delta.getFamilyLength(), - delta.getQualifierLength(), ts, KeyValue.Type.Put, - delta.getValueLength() + currentValue.getValueLength(), - tagBytes == null? 0: tagBytes.length); - // Copy in row, family, and qualifier - System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length); - System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(), - newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength()); - System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(), - newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength()); - // Copy in the value - CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset()); - System.arraycopy(delta.getValueArray(), delta.getValueOffset(), - newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(), - delta.getValueLength()); - // Copy in tag data - if (tagBytes != null) { - System.arraycopy(tagBytes, 0, - newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length); - } + if (currentCell != null) { + tags = TagUtil.carryForwardTags(tags, currentCell); + byte[] newValue = supplier.apply(currentCell); + // TODO: FIX. This is carnel knowledge of how KeyValues are made... + // This will be fixed by HBASE-18519 + return new KeyValue(mutation.getRow(), 0, mutation.getRow().length, + columnFamily, 0, columnFamily.length, + delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(), + Math.max(currentCell.getTimestamp() + 1, now), + KeyValue.Type.Put, newValue, 0, newValue.length, tags); } else { - // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP CellUtil.updateLatestStamp(delta, now); - newCell = delta; - tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); - if (tags != null) { - newCell = CellUtil.createCell(delta, tags); - } + return CollectionUtils.isEmpty(tags) ? delta : CellUtil.createCell(delta, tags); } - return newCell; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAppendFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAppendFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAppendFromClientSide.java new file mode 100644 index 0000000..ac1ac03 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAppendFromClientSide.java @@ -0,0 +1,85 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Run Append tests that use the HBase clients; + */ +@Category(LargeTests.class) +public class TestAppendFromClientSide { + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + @Rule + public TestName name = new TestName(); + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAppendWithCustomTimestamp() throws IOException { + TableName TABLENAME = TableName.valueOf(name.getMethodName()); + Table table = TEST_UTIL.createTable(TABLENAME, FAMILY); + long timestamp = 999; + Append append = new Append(ROW); + append.add(CellUtil.createCell(ROW, FAMILY, QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), Bytes.toBytes(100L))); + Result r = table.append(append); + assertEquals(1, r.size()); + assertEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.get(new Get(ROW)); + assertEquals(1, r.size()); + assertEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.append(append); + assertEquals(1, r.size()); + assertNotEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.get(new Get(ROW)); + assertEquals(1, r.size()); + assertNotEquals(timestamp, r.rawCells()[0].getTimestamp()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java index 767cace..14f5d67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,10 +34,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; @@ -462,6 +465,26 @@ public class TestIncrementsFromClientSide { } } + @Test + public void testIncrementWithCustomTimestamp() throws IOException { + TableName TABLENAME = TableName.valueOf(name.getMethodName()); + Table table = TEST_UTIL.createTable(TABLENAME, FAMILY); + long timestamp = 999; + Increment increment = new Increment(ROW); + increment.add(CellUtil.createCell(ROW, FAMILY, QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), Bytes.toBytes(100L))); + Result r = table.increment(increment); + assertEquals(1, r.size()); + assertEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.get(new Get(ROW)); + assertEquals(1, r.size()); + assertEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.increment(increment); + assertEquals(1, r.size()); + assertNotEquals(timestamp, r.rawCells()[0].getTimestamp()); + r = table.get(new Get(ROW)); + assertEquals(1, r.size()); + assertNotEquals(timestamp, r.rawCells()[0].getTimestamp()); + } /** * Call over to the adjacent class's method of same name. http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index c88c370..7c7e9dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -27,7 +27,6 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ProcedureState; @@ -130,7 +129,6 @@ public class TestProtobufUtil { qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); qualifierBuilder.setValue(ByteString.copyFromUtf8("v2")); valueBuilder.addQualifierValue(qualifierBuilder.build()); - qualifierBuilder.setTimestamp(timeStamp); mutateBuilder.addColumnValue(valueBuilder.build()); MutationProto proto = mutateBuilder.build(); @@ -203,6 +201,7 @@ public class TestProtobufUtil { */ @Test public void testIncrement() throws IOException { + long timeStamp = 111111; MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); mutateBuilder.setRow(ByteString.copyFromUtf8("row")); mutateBuilder.setMutateType(MutationType.INCREMENT); @@ -211,6 +210,7 @@ public class TestProtobufUtil { QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L))); + qualifierBuilder.setTimestamp(timeStamp); valueBuilder.addQualifierValue(qualifierBuilder.build()); qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L))); @@ -226,8 +226,8 @@ public class TestProtobufUtil { mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); Increment increment = ProtobufUtil.toIncrement(proto, null); - assertEquals(mutateBuilder.build(), - ProtobufUtil.toMutation(increment, MutationProto.newBuilder(), HConstants.NO_NONCE)); + mutateBuilder.setTimestamp(increment.getTimeStamp()); + assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/25ee5f7f/hbase-server/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index da7c7c4..6b1b5c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -22,12 +22,22 @@ import static org.junit.Assert.fail; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ProcedureState; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.procedure2.LockInfo; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; + @Category(SmallTests.class) public class TestProtobufUtil { public TestProtobufUtil() { @@ -148,4 +158,80 @@ public class TestProtobufUtil { assertWaitingProcedureEquals(waitingProcedure, waitingProcedure2); } + + /** + * Test Increment Mutate conversions. + * + * @throws IOException + */ + @Test + public void testIncrement() throws IOException { + long timeStamp = 111111; + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationProto.MutationType.INCREMENT); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L))); + qualifierBuilder.setTimestamp(timeStamp); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L))); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Increment increment = ProtobufUtil.toIncrement(proto, null); + mutateBuilder.setTimestamp(increment.getTimeStamp()); + assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); + } + + /** + * Test Append Mutate conversions. + * + * @throws IOException + */ + @Test + public void testAppend() throws IOException { + long timeStamp = 111111; + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationType.APPEND); + mutateBuilder.setTimestamp(timeStamp); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v1")); + qualifierBuilder.setTimestamp(timeStamp); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v2")); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Append append = ProtobufUtil.toAppend(proto, null); + + // append always use the latest timestamp, + // reset the timestamp to the original mutate + mutateBuilder.setTimestamp(append.getTimeStamp()); + assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); + } }
