http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java new file mode 100644 index 0000000..0c12648 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.shaded.protobuf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Any; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BytesValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestProtobufUtil { + public TestProtobufUtil() { + } + + @Test + public void testException() throws IOException { + NameBytesPair.Builder builder = NameBytesPair.newBuilder(); + final String omg = "OMG!!!"; + builder.setName("java.io.IOException"); + builder.setValue(ByteString.copyFrom(Bytes.toBytes(omg))); + Throwable t = ProtobufUtil.toException(builder.build()); + assertEquals(omg, t.getMessage()); + builder.clear(); + builder.setName("org.apache.hadoop.ipc.RemoteException"); + builder.setValue(ByteString.copyFrom(Bytes.toBytes(omg))); + t = ProtobufUtil.toException(builder.build()); + assertEquals(omg, t.getMessage()); + } + + /** + * Test basic Get conversions. + * + * @throws IOException + */ + @Test + public void testGet() throws IOException { + ClientProtos.Get.Builder getBuilder = ClientProtos.Get.newBuilder(); + getBuilder.setRow(ByteString.copyFromUtf8("row")); + Column.Builder columnBuilder = Column.newBuilder(); + columnBuilder.setFamily(ByteString.copyFromUtf8("f1")); + columnBuilder.addQualifier(ByteString.copyFromUtf8("c1")); + columnBuilder.addQualifier(ByteString.copyFromUtf8("c2")); + getBuilder.addColumn(columnBuilder.build()); + + columnBuilder.clear(); + columnBuilder.setFamily(ByteString.copyFromUtf8("f2")); + getBuilder.addColumn(columnBuilder.build()); + getBuilder.setLoadColumnFamiliesOnDemand(true); + ClientProtos.Get proto = getBuilder.build(); + // default fields + assertEquals(1, proto.getMaxVersions()); + assertEquals(true, proto.getCacheBlocks()); + + // set the default value for equal comparison + getBuilder = ClientProtos.Get.newBuilder(proto); + getBuilder.setMaxVersions(1); + getBuilder.setCacheBlocks(true); + + Get get = ProtobufUtil.toGet(proto); + assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); + } + + /** + * Test Delete Mutate conversions. + * + * @throws IOException + */ + @Test + public void testDelete() throws IOException { + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationType.DELETE); + mutateBuilder.setTimestamp(111111); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setDeleteType(DeleteType.DELETE_ONE_VERSION); + qualifierBuilder.setTimestamp(111222); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setDeleteType(DeleteType.DELETE_MULTIPLE_VERSIONS); + qualifierBuilder.setTimestamp(111333); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Delete delete = ProtobufUtil.toDelete(proto); + + // delete always have empty value, + // add empty value to the original mutate + for (ColumnValue.Builder column: + mutateBuilder.getColumnValueBuilderList()) { + for (QualifierValue.Builder qualifier: + column.getQualifierValueBuilderList()) { + qualifier.setValue(ByteString.EMPTY); + } + } + assertEquals(mutateBuilder.build(), + ProtobufUtil.toMutation(MutationType.DELETE, delete)); + } + + /** + * Test Put Mutate conversions. + * + * @throws IOException + */ + @Test + public void testPut() throws IOException { + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationType.PUT); + mutateBuilder.setTimestamp(111111); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v1")); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v2")); + qualifierBuilder.setTimestamp(222222); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Put put = ProtobufUtil.toPut(proto); + + // put value always use the default timestamp if no + // value level timestamp specified, + // add the timestamp to the original mutate + long timestamp = put.getTimeStamp(); + for (ColumnValue.Builder column: + mutateBuilder.getColumnValueBuilderList()) { + for (QualifierValue.Builder qualifier: + column.getQualifierValueBuilderList()) { + if (!qualifier.hasTimestamp()) { + qualifier.setTimestamp(timestamp); + } + } + } + assertEquals(mutateBuilder.build(), + ProtobufUtil.toMutation(MutationType.PUT, put)); + } + + /** + * Test basic Scan conversions. + * + * @throws IOException + */ + @Test + public void testScan() throws IOException { + ClientProtos.Scan.Builder scanBuilder = ClientProtos.Scan.newBuilder(); + scanBuilder.setStartRow(ByteString.copyFromUtf8("row1")); + scanBuilder.setStopRow(ByteString.copyFromUtf8("row2")); + Column.Builder columnBuilder = Column.newBuilder(); + columnBuilder.setFamily(ByteString.copyFromUtf8("f1")); + columnBuilder.addQualifier(ByteString.copyFromUtf8("c1")); + columnBuilder.addQualifier(ByteString.copyFromUtf8("c2")); + scanBuilder.addColumn(columnBuilder.build()); + + columnBuilder.clear(); + columnBuilder.setFamily(ByteString.copyFromUtf8("f2")); + scanBuilder.addColumn(columnBuilder.build()); + + ClientProtos.Scan proto = scanBuilder.build(); + + // Verify default values + assertEquals(1, proto.getMaxVersions()); + assertEquals(true, proto.getCacheBlocks()); + + // Verify fields survive ClientProtos.Scan -> Scan -> ClientProtos.Scan + // conversion + scanBuilder = ClientProtos.Scan.newBuilder(proto); + scanBuilder.setMaxVersions(2); + scanBuilder.setCacheBlocks(false); + scanBuilder.setCaching(1024); + ClientProtos.Scan expectedProto = scanBuilder.build(); + + ClientProtos.Scan actualProto = ProtobufUtil.toScan( + ProtobufUtil.toScan(expectedProto)); + assertEquals(expectedProto, actualProto); + } + + @Test + public void testToCell() throws Exception { + KeyValue kv1 = + new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]); + KeyValue kv2 = + new KeyValue(Bytes.toBytes("bbb"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]); + KeyValue kv3 = + new KeyValue(Bytes.toBytes("ccc"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]); + byte[] arr = new byte[kv1.getLength() + kv2.getLength() + kv3.getLength()]; + System.arraycopy(kv1.getBuffer(), kv1.getOffset(), arr, 0, kv1.getLength()); + System.arraycopy(kv2.getBuffer(), kv2.getOffset(), arr, kv1.getLength(), kv2.getLength()); + System.arraycopy(kv3.getBuffer(), kv3.getOffset(), arr, kv1.getLength() + kv2.getLength(), + kv3.getLength()); + ByteBuffer dbb = ByteBuffer.allocateDirect(arr.length); + dbb.put(arr); + ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(dbb, kv1.getLength(), kv2.getLength()); + CellProtos.Cell cell = ProtobufUtil.toCell(offheapKV); + Cell newOffheapKV = ProtobufUtil.toCell(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell); + assertTrue(CellComparator.COMPARATOR.compare(offheapKV, newOffheapKV) == 0); + } + + /** + * Test Increment Mutate conversions. + * + * @throws IOException + */ + @Test + public void testIncrement() throws IOException { + long timeStamp = 111111; + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationProto.MutationType.INCREMENT); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L))); + qualifierBuilder.setTimestamp(timeStamp); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L))); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Increment increment = ProtobufUtil.toIncrement(proto, null); + mutateBuilder.setTimestamp(increment.getTimeStamp()); + assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); + } + + /** + * Test Append Mutate conversions. + * + * @throws IOException + */ + @Test + public void testAppend() throws IOException { + long timeStamp = 111111; + MutationProto.Builder mutateBuilder = MutationProto.newBuilder(); + mutateBuilder.setRow(ByteString.copyFromUtf8("row")); + mutateBuilder.setMutateType(MutationType.APPEND); + mutateBuilder.setTimestamp(timeStamp); + ColumnValue.Builder valueBuilder = ColumnValue.newBuilder(); + valueBuilder.setFamily(ByteString.copyFromUtf8("f1")); + QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder(); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v1")); + qualifierBuilder.setTimestamp(timeStamp); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2")); + qualifierBuilder.setValue(ByteString.copyFromUtf8("v2")); + valueBuilder.addQualifierValue(qualifierBuilder.build()); + mutateBuilder.addColumnValue(valueBuilder.build()); + + MutationProto proto = mutateBuilder.build(); + // default fields + assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability()); + + // set the default value for equal comparison + mutateBuilder = MutationProto.newBuilder(proto); + mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); + + Append append = ProtobufUtil.toAppend(proto, null); + + // append always use the latest timestamp, + // reset the timestamp to the original mutate + mutateBuilder.setTimestamp(append.getTimeStamp()); + assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); + } + + private static ProcedureProtos.Procedure.Builder createProcedureBuilder(long procId) { + ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder(); + builder.setProcId(procId); + builder.setClassName("java.lang.Object"); + builder.setSubmittedTime(0); + builder.setState(ProcedureProtos.ProcedureState.RUNNABLE); + builder.setLastUpdate(0); + + return builder; + } + + private static ProcedureProtos.Procedure createProcedure(long procId) { + return createProcedureBuilder(procId).build(); + } + + private static LockServiceProtos.LockedResource createLockedResource( + LockServiceProtos.LockedResourceType resourceType, String resourceName, + LockServiceProtos.LockType lockType, + ProcedureProtos.Procedure exclusiveLockOwnerProcedure, int sharedLockCount) { + LockServiceProtos.LockedResource.Builder build = LockServiceProtos.LockedResource.newBuilder(); + build.setResourceType(resourceType); + build.setResourceName(resourceName); + build.setLockType(lockType); + if (exclusiveLockOwnerProcedure != null) { + build.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedure); + } + build.setSharedLockCount(sharedLockCount); + + return build.build(); + } + + @Test + public void testProcedureInfo() { + ProcedureProtos.Procedure.Builder builder = createProcedureBuilder(1); + ByteString stateBytes = ByteString.copyFrom(new byte[] { 65 }); + BytesValue state = BytesValue.newBuilder().setValue(stateBytes).build(); + builder.addStateMessage(Any.pack(state)); + ProcedureProtos.Procedure procedure = builder.build(); + + String procJson = ProtobufUtil.toProcedureJson(Lists.newArrayList(procedure)); + assertEquals("[{" + + "\"className\":\"java.lang.Object\"," + + "\"procId\":\"1\"," + + "\"submittedTime\":\"0\"," + + "\"state\":\"RUNNABLE\"," + + "\"lastUpdate\":\"0\"," + + "\"stateMessage\":[{\"value\":\"QQ==\"}]" + + "}]", procJson); + } + + @Test + public void testServerLockInfo() { + LockServiceProtos.LockedResource resource = createLockedResource( + LockServiceProtos.LockedResourceType.SERVER, "server", + LockServiceProtos.LockType.SHARED, null, 2); + + String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource)); + assertEquals("[{" + + "\"resourceType\":\"SERVER\"," + + "\"resourceName\":\"server\"," + + "\"lockType\":\"SHARED\"," + + "\"sharedLockCount\":2" + + "}]", lockJson); + } + + @Test + public void testNamespaceLockInfo() { + LockServiceProtos.LockedResource resource = createLockedResource( + LockServiceProtos.LockedResourceType.NAMESPACE, "ns", + LockServiceProtos.LockType.EXCLUSIVE, createProcedure(2), 0); + + String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource)); + assertEquals("[{" + + "\"resourceType\":\"NAMESPACE\"," + + "\"resourceName\":\"ns\"," + + "\"lockType\":\"EXCLUSIVE\"," + + "\"exclusiveLockOwnerProcedure\":{" + + "\"className\":\"java.lang.Object\"," + + "\"procId\":\"2\"," + + "\"submittedTime\":\"0\"," + + "\"state\":\"RUNNABLE\"," + + "\"lastUpdate\":\"0\"" + + "}," + + "\"sharedLockCount\":0" + + "}]", lockJson); + } + + @Test + public void testTableLockInfo() { + LockServiceProtos.LockedResource resource = createLockedResource( + LockServiceProtos.LockedResourceType.TABLE, "table", + LockServiceProtos.LockType.SHARED, null, 2); + + String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource)); + assertEquals("[{" + + "\"resourceType\":\"TABLE\"," + + "\"resourceName\":\"table\"," + + "\"lockType\":\"SHARED\"," + + "\"sharedLockCount\":2" + + "}]", lockJson); + } + + @Test + public void testRegionLockInfo() { + LockServiceProtos.LockedResource resource = createLockedResource( + LockServiceProtos.LockedResourceType.REGION, "region", + LockServiceProtos.LockType.EXCLUSIVE, createProcedure(3), 0); + + String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource)); + assertEquals("[{" + + "\"resourceType\":\"REGION\"," + + "\"resourceName\":\"region\"," + + "\"lockType\":\"EXCLUSIVE\"," + + "\"exclusiveLockOwnerProcedure\":{" + + "\"className\":\"java.lang.Object\"," + + "\"procId\":\"3\"," + + "\"submittedTime\":\"0\"," + + "\"state\":\"RUNNABLE\"," + + "\"lastUpdate\":\"0\"" + + "}," + + "\"sharedLockCount\":0" + + "}]", lockJson); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java deleted file mode 100644 index 36dabdd..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.NonceKey; -import org.apache.hadoop.util.StringUtils; - -/** - * Procedure information - */ -@InterfaceAudience.Public -public class ProcedureInfo implements Cloneable { - private final long procId; - private final String procName; - private final String procOwner; - private final ProcedureState procState; - private final long parentId; - private final NonceKey nonceKey; - private final IOException exception; - private final long lastUpdate; - private final long submittedTime; - private final byte[] result; - - private long clientAckTime = -1; - - @InterfaceAudience.Private - public ProcedureInfo( - final long procId, - final String procName, - final String procOwner, - final ProcedureState procState, - final long parentId, - final NonceKey nonceKey, - final IOException exception, - final long lastUpdate, - final long submittedTime, - final byte[] result) { - this.procId = procId; - this.procName = procName; - this.procOwner = procOwner; - this.procState = procState; - this.parentId = parentId; - this.nonceKey = nonceKey; - this.lastUpdate = lastUpdate; - this.submittedTime = submittedTime; - - // If the procedure is completed, we should treat exception and result differently - this.exception = exception; - this.result = result; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL", - justification="Intentional; calling super class clone doesn't make sense here.") - public ProcedureInfo clone() { - return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey, - exception, lastUpdate, submittedTime, result); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(procName); - sb.append(" pid="); - sb.append(procId); - if (hasParentId()) { - sb.append(", ppid="); - sb.append(parentId); - } - if (hasOwner()) { - sb.append(", owner="); - sb.append(procOwner); - } - sb.append(", state="); - sb.append(procState); - - long now = EnvironmentEdgeManager.currentTime(); - sb.append(", submittedTime="); - sb.append(StringUtils.formatTime(now - submittedTime)); - sb.append(" ago, lastUpdate="); - sb.append(StringUtils.formatTime(now - submittedTime)); - sb.append(" ago"); - - if (isFailed()) { - sb.append(", exception=\""); - sb.append(this.exception.getMessage()); - sb.append("\""); - } - return sb.toString(); - } - - public long getProcId() { - return procId; - } - - public String getProcName() { - return procName; - } - - public boolean hasOwner() { - return procOwner != null; - } - - public String getProcOwner() { - return procOwner; - } - - public ProcedureState getProcState() { - return procState; - } - - public boolean hasParentId() { - return (parentId != -1); - } - - public long getParentId() { - return parentId; - } - - public NonceKey getNonceKey() { - return nonceKey; - } - - public boolean isFailed() { - return exception != null; - } - - public IOException getException() { - if (isFailed()) { - return this.exception; - } - return null; - } - - public String getExceptionFullMessage() { - assert isFailed(); - final IOException e = getException(); - return e.getCause() + " - " + e.getMessage(); - } - - public boolean hasResultData() { - return result != null; - } - - public byte[] getResult() { - return result; - } - - public long getSubmittedTime() { - return submittedTime; - } - - public long getLastUpdate() { - return lastUpdate; - } - - public long executionTime() { - return lastUpdate - submittedTime; - } - - @InterfaceAudience.Private - public boolean hasClientAckTime() { - return clientAckTime != -1; - } - - @InterfaceAudience.Private - public long getClientAckTime() { - return clientAckTime; - } - - @InterfaceAudience.Private - public void setClientAckTime(final long timestamp) { - this.clientAckTime = timestamp; - } - - /** - * Check if the user is this procedure's owner - * @param procInfo the procedure to check - * @param user the user - * @return true if the user is the owner of the procedure, - * false otherwise or the owner is unknown. - */ - @InterfaceAudience.Private - public static boolean isProcedureOwner(final ProcedureInfo procInfo, final User user) { - if (user == null) { - return false; - } - String procOwner = procInfo.getProcOwner(); - if (procOwner == null) { - return false; - } - return procOwner.equals(user.getShortName()); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java deleted file mode 100644 index 30ecee8..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -@InterfaceAudience.Public -public class LockInfo { - @InterfaceAudience.Public - public enum ResourceType { - SERVER, NAMESPACE, TABLE, REGION - } - - @InterfaceAudience.Public - public enum LockType { - EXCLUSIVE, SHARED - } - - @InterfaceAudience.Public - public static class WaitingProcedure { - private LockType lockType; - private ProcedureInfo procedure; - - public WaitingProcedure() { - } - - public LockType getLockType() { - return lockType; - } - - public void setLockType(LockType lockType) { - this.lockType = lockType; - } - - public ProcedureInfo getProcedure() { - return procedure; - } - - public void setProcedure(ProcedureInfo procedure) { - this.procedure = procedure; - } - } - - private ResourceType resourceType; - private String resourceName; - private LockType lockType; - private ProcedureInfo exclusiveLockOwnerProcedure; - private int sharedLockCount; - private final List<WaitingProcedure> waitingProcedures; - - public LockInfo() { - waitingProcedures = new ArrayList<>(); - } - - public ResourceType getResourceType() { - return resourceType; - } - - public void setResourceType(ResourceType resourceType) { - this.resourceType = resourceType; - } - - public String getResourceName() { - return resourceName; - } - - public void setResourceName(String resourceName) { - this.resourceName = resourceName; - } - - public LockType getLockType() { - return lockType; - } - - public void setLockType(LockType lockType) { - this.lockType = lockType; - } - - public ProcedureInfo getExclusiveLockOwnerProcedure() { - return exclusiveLockOwnerProcedure; - } - - public void setExclusiveLockOwnerProcedure( - ProcedureInfo exclusiveLockOwnerProcedure) { - this.exclusiveLockOwnerProcedure = exclusiveLockOwnerProcedure; - } - - public int getSharedLockCount() { - return sharedLockCount; - } - - public void setSharedLockCount(int sharedLockCount) { - this.sharedLockCount = sharedLockCount; - } - - public List<WaitingProcedure> getWaitingProcedures() { - return waitingProcedures; - } - - public void setWaitingProcedures(List<WaitingProcedure> waitingProcedures) { - this.waitingProcedures.clear(); - this.waitingProcedures.addAll(waitingProcedures); - } - - public void addWaitingProcedure(WaitingProcedure waitingProcedure) { - waitingProcedures.add(waitingProcedure); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java new file mode 100644 index 0000000..8d85b9d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.escape.Escaper; +import org.apache.hadoop.hbase.shaded.com.google.common.escape.Escapers; + +/** + * Utility class for converting objects to JRuby. + * + * It handles null, Boolean, Number, String, byte[], List<Object>, Map<String, Object> structures. + * + * <p> + * E.g. + * <pre> + * Map<String, Object> map = new LinkedHashMap<>(); + * map.put("null", null); + * map.put("boolean", true); + * map.put("number", 1); + * map.put("string", "str"); + * map.put("binary", new byte[] { 1, 2, 3 }); + * map.put("list", Lists.newArrayList(1, "2", true)); + * </pre> + * </p> + * + * <p> + * Calling {@link #print(Object)} method will result: + * <pre> + * { null => '', boolean => 'true', number => '1', string => 'str', binary => '010203', list => [ '1', '2', 'true' ] } + * </pre> + * </p> + */ +@InterfaceAudience.Private +public final class JRubyFormat { + private static final Escaper escaper; + + static { + escaper = Escapers.builder() + .addEscape('\\', "\\\\") + .addEscape('\'', "\\'") + .addEscape('\n', "\\n") + .addEscape('\r', "\\r") + .addEscape('\t', "\\t") + .addEscape('\f', "\\f") + .build(); + } + + private JRubyFormat() { + } + + private static String escape(Object object) { + if (object == null) { + return ""; + } else { + return escaper.escape(object.toString()); + } + } + + @SuppressWarnings({ "unchecked" }) + private static void appendJRuby(StringBuilder builder, Object object) { + if (object == null) { + builder.append("''"); + } else if (object instanceof List) { + builder.append("["); + + boolean first = true; + + for (Object element: (List<Object>)object) { + if (first) { + first = false; + builder.append(" "); + } else { + builder.append(", "); + } + + appendJRuby(builder, element); + } + + if (!first) { + builder.append(" "); + } + + builder.append("]"); + } else if (object instanceof Map) { + builder.append("{"); + + boolean first = true; + + for (Entry<String, Object> entry: ((Map<String, Object>)object).entrySet()) { + if (first) { + first = false; + builder.append(" "); + } else { + builder.append(", "); + } + + String key = entry.getKey(); + String escapedKey = escape(key); + + if (key.equals(escapedKey)) { + builder.append(key); + } else { + builder.append("'").append(escapedKey).append("'"); + } + + builder.append(" => "); + appendJRuby(builder, entry.getValue()); + } + + if (!first) { + builder.append(" "); + } + + builder.append("}"); + } else if (object instanceof byte[]) { + String byteString = Bytes.toHex((byte[])object); + builder.append("'").append(escape(byteString)).append("'"); + } else { + builder.append("'").append(escape(object)).append("'"); + } + } + + public static String print(Object object) { + StringBuilder builder = new StringBuilder(); + + appendJRuby(builder, object); + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java new file mode 100644 index 0000000..96b3da0 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestJRubyFormat { + @Test + public void testPrint() { + Map<String, Object> map = new LinkedHashMap<>(); + map.put("null", null); + map.put("boolean", true); + map.put("number", 1); + map.put("string", "str"); + map.put("binary", new byte[] { 1, 2, 3 }); + map.put("list", Lists.newArrayList(1, "2", true)); + + String jrubyString = JRubyFormat.print(map); + assertEquals("{ null => '', boolean => 'true', number => '1', " + + "string => 'str', binary => '010203', " + + "list => [ '1', '2', 'true' ] }", jrubyString); + } + + @Test + public void testEscape() { + String jrubyString = JRubyFormat.print("\\\'\n\r\t\f"); + assertEquals("'\\\\\\'\\n\\r\\t\\f'", jrubyString); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java new file mode 100644 index 0000000..e4d867d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public enum LockType { + EXCLUSIVE, SHARED +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java new file mode 100644 index 0000000..e3320ab --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure2.LockedResourceType; + +@InterfaceAudience.Private +public class LockedResource { + private final LockedResourceType resourceType; + private final String resourceName; + private final LockType lockType; + private final Procedure<?> exclusiveLockOwnerProcedure; + private final int sharedLockCount; + private final List<Procedure<?>> waitingProcedures; + + public LockedResource(LockedResourceType resourceType, String resourceName, + LockType lockType, Procedure<?> exclusiveLockOwnerProcedure, + int sharedLockCount, List<Procedure<?>> waitingProcedures) { + this.resourceType = resourceType; + this.resourceName = resourceName; + this.lockType = lockType; + this.exclusiveLockOwnerProcedure = exclusiveLockOwnerProcedure; + this.sharedLockCount = sharedLockCount; + this.waitingProcedures = waitingProcedures; + } + + public LockedResourceType getResourceType() { + return resourceType; + } + + public String getResourceName() { + return resourceName; + } + + public LockType getLockType() { + return lockType; + } + + public Procedure<?> getExclusiveLockOwnerProcedure() { + return exclusiveLockOwnerProcedure; + } + + public int getSharedLockCount() { + return sharedLockCount; + } + + public List<Procedure<?>> getWaitingProcedures() { + return waitingProcedures; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java new file mode 100644 index 0000000..29820f1 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public enum LockedResourceType { + SERVER, NAMESPACE, TABLE, REGION +} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 335e83c..db488c9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -165,17 +163,17 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * The user-level code of the procedure may have some state to * persist (e.g. input arguments or current position in the processing state) to * be able to resume on failure. - * @param stream the stream that will contain the user serialized data + * @param serializer stores the serializable state */ - protected abstract void serializeStateData(final OutputStream stream) + protected abstract void serializeStateData(final ProcedureStateSerializer serializer) throws IOException; /** * Called on store load to allow the user to decode the previously serialized * state. - * @param stream the stream that contains the user serialized data + * @param serializer contains the serialized state */ - protected abstract void deserializeStateData(final InputStream stream) + protected abstract void deserializeStateData(final ProcedureStateSerializer serializer) throws IOException; /** @@ -184,7 +182,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Framework will call this method just before it invokes {@link #execute(Object)}. * It calls {@link #releaseLock(Object)} after the call to execute. * - * <p>If you need to hold the lock for the life of the Procdure -- i.e. you do not + * <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not * want any other Procedure interfering while this Procedure is running, see * {@link #holdLock(Object)}. * http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index d0052f6..9337530 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -22,8 +22,6 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -241,7 +239,7 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo. + * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. * Once a Root-Procedure completes (success or failure), the result will be added to this map. * The user of ProcedureExecutor should call getResult(procId) to get the result. */ @@ -750,14 +748,22 @@ public class ProcedureExecutor<TEnvironment> { } } - private static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> { + public static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> { private String procName; - public FailedProcedure(NonceKey nonceKey, String procName, User owner, - IOException exception) { + public FailedProcedure() { + } + + public FailedProcedure(long procId, String procName, User owner, + NonceKey nonceKey, IOException exception) { this.procName = procName; - setNonceKey(nonceKey); + setProcId(procId); + setState(ProcedureState.ROLLEDBACK); setOwner(owner); + setNonceKey(nonceKey); + long currentTime = EnvironmentEdgeManager.currentTime(); + setSubmittedTime(currentTime); + setLastUpdate(currentTime); setFailure(Objects.toString(exception.getMessage(), ""), exception); } @@ -785,11 +791,13 @@ public class ProcedureExecutor<TEnvironment> { } @Override - protected void serializeStateData(OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { } @Override - protected void deserializeStateData(InputStream stream) throws IOException { + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { } } @@ -809,7 +817,9 @@ public class ProcedureExecutor<TEnvironment> { final Long procId = nonceKeysToProcIdsMap.get(nonceKey); if (procId == null || completed.containsKey(procId)) return; - Procedure proc = new FailedProcedure(nonceKey, procName, procOwner, exception); + Procedure<?> proc = new FailedProcedure(procId.longValue(), + procName, procOwner, nonceKey, exception); + completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc)); } @@ -1045,15 +1055,17 @@ public class ProcedureExecutor<TEnvironment> { } /** - * List procedures. + * Get procedures. * @return the procedures in a list */ - public List<Procedure> listProcedures() { - final List<Procedure> procedureLists = new ArrayList<>(procedures.size() + completed.size()); - procedureLists.addAll(procedures.values()); + public List<Procedure<?>> getProcedures() { + final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size()); + for (Procedure<?> procedure : procedures.values()) { + procedureLists.add(procedure); + } // Note: The procedure could show up twice in the list with different state, as // it could complete after we walk through procedures list and insert into - // procedureList - it is ok, as we will use the information in the ProcedureInfo + // procedureList - it is ok, as we will use the information in the Procedure // to figure it out; to prevent this would increase the complexity of the logic. for (CompletedProcedureRetainer retainer: completed.values()) { procedureLists.add(retainer.getProcedure()); http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java index b148dae..596ff21 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.procedure2; -import java.io.InputStream; -import java.io.OutputStream; - +import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -58,12 +56,12 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn } @Override - public void serializeStateData(final OutputStream stream) { - throw new UnsupportedOperationException(); + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { } @Override - public void deserializeStateData(final InputStream stream) { - throw new UnsupportedOperationException(); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index a5a126d..1e4240a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -125,13 +125,13 @@ public interface ProcedureScheduler { * List lock queues. * @return the locks */ - // TODO: This seems to be the wrong place to hang this method. - List<LockInfo> listLocks(); + List<LockedResource> getLocks(); /** - * @return {@link LockInfo} for resource of specified type & name. null if resource is not locked. + * @return {@link LockedResource} for resource of specified type & name. null if resource is not locked. */ - LockInfo getLockInfoForResource(LockInfo.ResourceType resourceType, String resourceName); + LockedResource getLockResource(LockedResourceType resourceType, String resourceName); + /** * Returns the number of elements in this queue. * @return the number of elements in this queue. http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java new file mode 100644 index 0000000..03842d9 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; + +public interface ProcedureStateSerializer { + void serialize(Message message) throws IOException; + + <M extends Message> M deserialize(Class<M> clazz) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 3232f2b..2381abd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -18,21 +18,22 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; - import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.ProcedureState; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Any; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Parser; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; -import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.NonceKey; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - /** * Helper to convert to/from ProcedureProtos */ @@ -85,6 +86,69 @@ public final class ProcedureUtil { // ========================================================================== /** + * A serializer for our Procedures. Instead of the previous serializer, it + * uses the stateMessage list to store the internal state of the Procedures. + */ + private static class StateSerializer implements ProcedureStateSerializer { + private final ProcedureProtos.Procedure.Builder builder; + private int deserializeIndex; + + public StateSerializer(ProcedureProtos.Procedure.Builder builder) { + this.builder = builder; + } + + @Override + public void serialize(Message message) throws IOException { + Any packedMessage = Any.pack(message); + builder.addStateMessage(packedMessage); + } + + @Override + public <M extends Message> M deserialize(Class<M> clazz) + throws IOException { + if (deserializeIndex >= builder.getStateMessageCount()) { + throw new IOException("Invalid state message index: " + deserializeIndex); + } + + try { + Any packedMessage = builder.getStateMessage(deserializeIndex++); + return packedMessage.unpack(clazz); + } catch (InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } + } + } + + /** + * A serializer (deserializer) for those Procedures which were serialized + * before this patch. It deserializes the old, binary stateData field. + */ + private static class CompatStateSerializer implements ProcedureStateSerializer { + private InputStream inputStream; + + public CompatStateSerializer(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public void serialize(Message message) throws IOException { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + @Override + public <M extends Message> M deserialize(Class<M> clazz) + throws IOException { + Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); + try { + return parser.parseDelimitedFrom(inputStream); + } catch (InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } + } + } + + /** * Helper to convert the procedure to protobuf. * Used by ProcedureStore implementations. */ @@ -130,15 +194,8 @@ public final class ProcedureUtil { builder.setResult(UnsafeByteOperations.unsafeWrap(result)); } - final ByteString.Output stateStream = ByteString.newOutput(); - try { - proc.serializeStateData(stateStream); - if (stateStream.size() > 0) { - builder.setStateData(stateStream.toByteString()); - } - } finally { - stateStream.close(); - } + ProcedureStateSerializer serializer = new StateSerializer(builder); + proc.serializeStateData(serializer); if (proc.getNonceKey() != null) { builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); @@ -198,87 +255,62 @@ public final class ProcedureUtil { proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); } - // we want to call deserialize even when the stream is empty, mainly for testing. - proc.deserializeStateData(proto.getStateData().newInput()); + ProcedureStateSerializer serializer = null; + + if (proto.getStateMessageCount() > 0) { + serializer = new StateSerializer(proto.toBuilder()); + } else if (proto.hasStateData()) { + InputStream inputStream = proto.getStateData().newInput(); + serializer = new CompatStateSerializer(inputStream); + } + + if (serializer != null) { + proc.deserializeStateData(serializer); + } return proc; } // ========================================================================== - // convert to and from ProcedureInfo object + // convert from LockedResource object // ========================================================================== - /** - * @return Convert the current {@link ProcedureInfo} into a Protocol Buffers Procedure - * instance. - */ - public static ProcedureProtos.Procedure convertToProtoProcedure(final ProcedureInfo procInfo) { - final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder(); + public static LockServiceProtos.LockedResourceType convertToProtoResourceType( + LockedResourceType resourceType) { + return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); + } - builder.setClassName(procInfo.getProcName()); - builder.setProcId(procInfo.getProcId()); - builder.setSubmittedTime(procInfo.getSubmittedTime()); - builder.setState(ProcedureProtos.ProcedureState.valueOf(procInfo.getProcState().name())); - builder.setLastUpdate(procInfo.getLastUpdate()); + public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { + return LockServiceProtos.LockType.valueOf(lockType.name()); + } - if (procInfo.hasParentId()) { - builder.setParentId(procInfo.getParentId()); - } + public static LockServiceProtos.LockedResource convertToProtoLockedResource( + LockedResource lockedResource) throws IOException + { + LockServiceProtos.LockedResource.Builder builder = + LockServiceProtos.LockedResource.newBuilder(); - if (procInfo.hasOwner()) { - builder.setOwner(procInfo.getProcOwner()); - } + builder + .setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) + .setResourceName(lockedResource.getResourceName()) + .setLockType(convertToProtoLockType(lockedResource.getLockType())); - if (procInfo.isFailed()) { - builder.setException(ForeignExceptionUtil.toProtoForeignException(procInfo.getException())); - } + Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); - if (procInfo.hasResultData()) { - builder.setResult(UnsafeByteOperations.unsafeWrap(procInfo.getResult())); + if (exclusiveLockOwnerProcedure != null) { + ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = + convertToProtoProcedure(exclusiveLockOwnerProcedure); + builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); } - return builder.build(); - } + builder.setSharedLockCount(lockedResource.getSharedLockCount()); - /** - * Helper to convert the protobuf object. - * @return Convert the current Protocol Buffers Procedure to {@link ProcedureInfo} - * instance. - */ - public static ProcedureInfo convertToProcedureInfo(final ProcedureProtos.Procedure procProto) { - NonceKey nonceKey = null; - if (procProto.getNonce() != HConstants.NO_NONCE) { - nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce()); + for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { + ProcedureProtos.Procedure waitingProcedureProto = + convertToProtoProcedure(waitingProcedure); + builder.addWaitingProcedures(waitingProcedureProto); } - return new ProcedureInfo(procProto.getProcId(), procProto.getClassName(), - procProto.hasOwner() ? procProto.getOwner() : null, - convertToProcedureState(procProto.getState()), - procProto.hasParentId() ? procProto.getParentId() : -1, nonceKey, - procProto.hasException() ? - ForeignExceptionUtil.toIOException(procProto.getException()) : null, - procProto.getLastUpdate(), procProto.getSubmittedTime(), - procProto.hasResult() ? procProto.getResult().toByteArray() : null); - } - - public static ProcedureState convertToProcedureState(ProcedureProtos.ProcedureState state) { - return ProcedureState.valueOf(state.name()); - } - - public static ProcedureInfo convertToProcedureInfo(final Procedure proc) { - return convertToProcedureInfo(proc, null); - } - - /** - * Helper to create the ProcedureInfo from Procedure. - */ - public static ProcedureInfo convertToProcedureInfo(final Procedure proc, - final NonceKey nonceKey) { - final RemoteProcedureException exception = proc.hasException() ? proc.getException() : null; - return new ProcedureInfo(proc.getProcId(), proc.toStringClass(), proc.getOwner(), - convertToProcedureState(proc.getState()), - proc.hasParent() ? proc.getParentProcId() : -1, nonceKey, - exception != null ? exception.unwrapRemoteIOException() : null, - proc.getLastUpdate(), proc.getSubmittedTime(), proc.getResult()); + return builder.build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 64bb278..f03653f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData; @@ -69,15 +66,17 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { SequentialProcedureData.Builder data = SequentialProcedureData.newBuilder(); data.setExecuted(executed); - data.build().writeDelimitedTo(stream); + serializer.serialize(data.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - SequentialProcedureData data = SequentialProcedureData.parseDelimitedFrom(stream); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + SequentialProcedureData data = serializer.deserialize(SequentialProcedureData.class); executed = data.getExecuted(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index 69c59c8..3c2445c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -78,12 +78,13 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { } @Override - public List<LockInfo> listLocks() { + public List<LockedResource> getLocks() { return Collections.emptyList(); } @Override - public LockInfo getLockInfoForResource(LockInfo.ResourceType resourceType, String resourceName) { + public LockedResource getLockResource(LockedResourceType resourceType, + String resourceName) { return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index 5de5066..25dfe8b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -285,17 +283,19 @@ public abstract class StateMachineProcedure<TEnvironment, TState> } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); for (int i = 0; i < stateCount; ++i) { data.addState(states[i]); } - data.build().writeDelimitedTo(stream); + serializer.serialize(data.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); stateCount = data.getStateCount(); if (stateCount > 0) { states = new int[stateCount]; http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 5cdbc35..99d3c28 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -23,8 +23,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Set; import java.util.concurrent.Callable; @@ -37,11 +35,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BytesValue; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Threads; @@ -367,11 +366,13 @@ public class ProcedureTestingUtility { protected boolean abort(TEnv env) { return false; } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { } } @@ -416,19 +417,23 @@ public class ProcedureTestingUtility { } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0); - if (data != null) stream.write(data); + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); + BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); + serializer.serialize(builder.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - int len = StreamUtils.readRawVarint32(stream); - if (len > 0) { - data = new byte[len]; - stream.read(data); - } else { + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + BytesValue bytesValue = serializer.deserialize(BytesValue.class); + ByteString dataString = bytesValue.getValue(); + + if (dataString.isEmpty()) { data = null; + } else { + data = dataString.toByteArray(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index b81e0f9..ce9795f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -28,10 +26,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -42,8 +40,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; @Category({MasterTests.class, SmallTests.class}) public class TestProcedureEvents { @@ -163,15 +159,23 @@ public class TestProcedureEvents { } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - StreamUtils.writeRawVInt32(stream, ntimeouts.get()); - StreamUtils.writeRawVInt32(stream, maxTimeouts); + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get()); + serializer.serialize(ntimeoutsBuilder.build()); + + Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts); + serializer.serialize(maxTimeoutsBuilder.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - ntimeouts.set(StreamUtils.readRawVarint32(stream)); - maxTimeouts = StreamUtils.readRawVarint32(stream); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class); + ntimeouts.set(ntimeoutsValue.getValue()); + + Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class); + maxTimeouts = maxTimeoutsValue.getValue(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 9681bfb..f1dadb9 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.CountDownLatch; @@ -31,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; @@ -382,17 +381,19 @@ public class TestProcedureRecovery { } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - super.serializeStateData(stream); - stream.write(Bytes.toBytes(iResult)); + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.serializeStateData(serializer); + Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); + serializer.serialize(builder.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - super.deserializeStateData(stream); - byte[] data = new byte[4]; - stream.read(data); - iResult = Bytes.toInt(data); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.deserializeStateData(serializer); + Int32Value value = serializer.deserialize(Int32Value.class); + iResult = value.getValue(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index bd614e3..80264f5 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -29,9 +27,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int64Value; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -195,13 +193,17 @@ public class TestProcedureReplayOrder { protected boolean abort(TestProcedureEnv env) { return true; } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - StreamUtils.writeLong(stream, execId); + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId); + serializer.serialize(builder.build()); } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - execId = StreamUtils.readLong(stream); + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + Int64Value value = serializer.deserialize(Int64Value.class); + execId = value.getValue(); step = 2; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index 0146bc7..f86df2d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -251,11 +249,13 @@ public class TestProcedureSuspended { protected boolean abort(TestProcEnv env) { return false; } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java index 78daf5a..af25108 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java @@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -57,11 +54,13 @@ public class TestProcedureToString { } @Override - protected void serializeStateData(OutputStream stream) throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { } @Override - protected void deserializeStateData(InputStream stream) throws IOException { + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { } } http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java index 7f98b80..dec5854 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java @@ -18,11 +18,7 @@ package org.apache.hadoop.hbase.procedure2; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.util.JsonFormat; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -34,8 +30,6 @@ import static org.junit.Assert.assertEquals; @Category({MasterTests.class, SmallTests.class}) public class TestProcedureUtil { - private static final Log LOG = LogFactory.getLog(TestProcedureUtil.class); - @Test public void testValidation() throws Exception { ProcedureUtil.validateClass(new TestProcedure(10)); @@ -49,34 +43,15 @@ public class TestProcedureUtil { @Test public void testConvert() throws Exception { // check Procedure to protobuf conversion - final TestProcedure proc1 = new TestProcedure(10); + final TestProcedure proc1 = new TestProcedure(10, 1, new byte[] { 65 }); final ProcedureProtos.Procedure proto1 = ProcedureUtil.convertToProtoProcedure(proc1); final TestProcedure proc2 = (TestProcedure)ProcedureUtil.convertToProcedure(proto1); final ProcedureProtos.Procedure proto2 = ProcedureUtil.convertToProtoProcedure(proc2); assertEquals(false, proto2.hasResult()); assertEquals("Procedure protobuf does not match", proto1, proto2); - - // remove the state-data from the procedure protobuf to compare it to the gen ProcedureInfo - final ProcedureProtos.Procedure pbproc = proto2.toBuilder().clearStateData().build(); - - // check ProcedureInfo to protobuf conversion - final ProcedureInfo protoInfo1 = ProcedureUtil.convertToProcedureInfo(proc1); - final ProcedureProtos.Procedure proto3 = ProcedureUtil.convertToProtoProcedure(protoInfo1); - final ProcedureInfo protoInfo2 = ProcedureUtil.convertToProcedureInfo(proto3); - final ProcedureProtos.Procedure proto4 = ProcedureUtil.convertToProtoProcedure(protoInfo2); - assertEquals("ProcedureInfo protobuf does not match", proto3, proto4); - assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto3); - assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto4); } public static class TestProcedureNoDefaultConstructor extends TestProcedure { public TestProcedureNoDefaultConstructor(int x) {} } - - public static void main(final String [] args) throws Exception { - final TestProcedure proc1 = new TestProcedure(10); - final ProcedureProtos.Procedure proto1 = ProcedureUtil.convertToProtoProcedure(proc1); - JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace(); - System.out.println(printer.print(proto1)); - } }