http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java new file mode 100644 index 0000000..1849d90 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.access; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +/** + * Coprocessor service for bulk loads in secure mode. + * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 + */ [email protected] +@Deprecated +public class SecureBulkLoadEndpoint extends SecureBulkLoadService + implements CoprocessorService, Coprocessor { + + public static final long VERSION = 0L; + + private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class); + + private RegionCoprocessorEnvironment env; + + @Override + public void start(CoprocessorEnvironment env) { + this.env = (RegionCoprocessorEnvironment)env; + LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); + LOG.warn("Secure bulk load has been integrated into HBase core."); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request, + RpcCallback<PrepareBulkLoadResponse> done) { + try { + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), + convert(request)); + done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(null); + } + + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest + convert(PrepareBulkLoadRequest request) + throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException { + byte [] bytes = request.toByteArray(); + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder + builder = + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest. + newBuilder(); + builder.mergeFrom(bytes); + return builder.build(); + } + + @Override + public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request, + RpcCallback<CleanupBulkLoadResponse> done) { + try { + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), convert(request)); + done.run(CleanupBulkLoadResponse.newBuilder().build()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(null); + } + + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest + convert(CleanupBulkLoadRequest request) + throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException { + byte [] bytes = request.toByteArray(); + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder + builder = + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest. + newBuilder(); + builder.mergeFrom(bytes); + return builder.build(); + } + + @Override + public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request, + RpcCallback<SecureBulkLoadHFilesResponse> done) { + boolean loaded = false; + try { + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request); + loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), + convert(bulkLoadHFileRequest)); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build()); + } + + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest + convert(BulkLoadHFileRequest request) + throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException { + byte [] bytes = request.toByteArray(); + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder + builder = + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest. + newBuilder(); + builder.mergeFrom(bytes); + return builder.build(); + } + + private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest( + SecureBulkLoadHFilesRequest request) { + BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder(); + RegionSpecifier region = + ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env + .getRegionInfo().getRegionName()); + bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken()) + .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum()) + .addAllFamilyPath(request.getFamilyPathList()); + return bulkLoadHFileRequest.build(); + } + + @Override + public Service getService() { + return this; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/Aggregate.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/Aggregate.proto b/hbase-endpoint/src/main/protobuf/Aggregate.proto new file mode 100644 index 0000000..4d32e70 --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/Aggregate.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "AggregateProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "Client.proto"; + +message AggregateRequest { + /** The request passed to the AggregateService consists of three parts + * (1) the (canonical) classname of the ColumnInterpreter implementation + * (2) the Scan query + * (3) any bytes required to construct the ColumnInterpreter object + * properly + */ + required string interpreter_class_name = 1; + required Scan scan = 2; + optional bytes interpreter_specific_bytes = 3; +} + +message AggregateResponse { + /** + * The AggregateService methods all have a response that either is a Pair + * or a simple object. When it is a Pair both first_part and second_part + * have defined values (and the second_part is not present in the response + * when the response is not a pair). Refer to the AggregateImplementation + * class for an overview of the AggregateResponse object constructions. + */ + repeated bytes first_part = 1; + optional bytes second_part = 2; +} + +/** Refer to the AggregateImplementation class for an overview of the + * AggregateService method implementations and their functionality. + */ +service AggregateService { + rpc GetMax (AggregateRequest) returns (AggregateResponse); + rpc GetMin (AggregateRequest) returns (AggregateResponse); + rpc GetSum (AggregateRequest) returns (AggregateResponse); + rpc GetRowNum (AggregateRequest) returns (AggregateResponse); + rpc GetAvg (AggregateRequest) returns (AggregateResponse); + rpc GetStd (AggregateRequest) returns (AggregateResponse); + rpc GetMedian (AggregateRequest) returns (AggregateResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/BulkDelete.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/BulkDelete.proto b/hbase-endpoint/src/main/protobuf/BulkDelete.proto new file mode 100644 index 0000000..c2ec8ca --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/BulkDelete.proto @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated"; +option java_outer_classname = "BulkDeleteProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "Client.proto"; + +message BulkDeleteRequest { + required Scan scan = 1; + required DeleteType deleteType = 2; + optional uint64 timestamp = 3; + required uint32 rowBatchSize = 4; + + enum DeleteType { + ROW = 0; + FAMILY = 1; + COLUMN = 2; + VERSION = 3; + } +} + +message BulkDeleteResponse { + required uint64 rowsDeleted = 1; + optional uint64 versionsDeleted = 2; +} + +service BulkDeleteService { + rpc delete(BulkDeleteRequest) + returns (BulkDeleteResponse); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto new file mode 100644 index 0000000..b4dc01e --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationNullResponseProtocol.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Coprocessor test +option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated"; +option java_outer_classname = "ColumnAggregationWithNullResponseProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +// use unique names for messages in ColumnAggregationXXX.protos due to a bug in +// protoc or hadoop's protoc compiler. +message ColumnAggregationNullResponseSumRequest { + required bytes family = 1; + optional bytes qualifier = 2; +} + +message ColumnAggregationNullResponseSumResponse { + optional int64 sum = 1; +} + +service ColumnAggregationServiceNullResponse { + rpc sum(ColumnAggregationNullResponseSumRequest) + returns(ColumnAggregationNullResponseSumResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto new file mode 100644 index 0000000..ad1acda --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationProtocol.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Coprocessor test +option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated"; +option java_outer_classname = "ColumnAggregationProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message SumRequest { + required bytes family = 1; + optional bytes qualifier = 2; +} + +message SumResponse { + required int64 sum = 1; +} + +service ColumnAggregationService { + rpc sum(SumRequest) returns(SumResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto new file mode 100644 index 0000000..7808949 --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/ColumnAggregationWithErrorsProtocol.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Coprocessor test +option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated"; +option java_outer_classname = "ColumnAggregationWithErrorsProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +// use unique names for messages in ColumnAggregationXXX.protos due to a bug in +// protoc or hadoop's protoc compiler. +message ColumnAggregationWithErrorsSumRequest { + required bytes family = 1; + optional bytes qualifier = 2; +} + +message ColumnAggregationWithErrorsSumResponse { + required int64 sum = 1; +} + +service ColumnAggregationServiceWithErrors { + rpc sum(ColumnAggregationWithErrorsSumRequest) + returns(ColumnAggregationWithErrorsSumResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto new file mode 100644 index 0000000..539f7da --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/DummyRegionServerEndpoint.proto @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hbase.test.pb; + +// Coprocessor test +option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated"; +option java_outer_classname = "DummyRegionServerEndpointProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message DummyRequest { +} + +message DummyResponse { + required string value = 1; +} + +service DummyService { + rpc dummyCall(DummyRequest) returns(DummyResponse); + rpc dummyThrow(DummyRequest) returns(DummyResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto new file mode 100644 index 0000000..b8c77ca --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/IncrementCounterProcessor.proto @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated"; +option java_outer_classname = "IncrementCounterProcessorTestProtos"; +option java_generate_equals_and_hash = true; + +message IncCounterProcessorRequest { + required bytes row = 1; + required int32 counter = 2; +} + +message IncCounterProcessorResponse { + required int32 response = 1; +} + +message FriendsOfFriendsProcessorRequest { + required bytes person = 1; + required bytes row = 2; + repeated string result = 3; +} + +message FriendsOfFriendsProcessorResponse { + repeated string result = 1; +} + +message RowSwapProcessorRequest { + required bytes row1 = 1; + required bytes row2 = 2; +} + +message RowSwapProcessorResponse { +} + +message TimeoutProcessorRequest { + required bytes row = 1; +} + +message TimeoutProcessorResponse { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto new file mode 100644 index 0000000..d86d162 --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "SecureBulkLoadProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import 'Client.proto'; + +message SecureBulkLoadHFilesRequest { + repeated BulkLoadHFileRequest.FamilyPath family_path = 1; + optional bool assign_seq_num = 2; + required DelegationToken fs_token = 3; + required string bulk_token = 4; +} + +message SecureBulkLoadHFilesResponse { + required bool loaded = 1; +} + +service SecureBulkLoadService { + rpc PrepareBulkLoad(PrepareBulkLoadRequest) + returns (PrepareBulkLoadResponse); + + rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest) + returns (SecureBulkLoadHFilesResponse); + + rpc CleanupBulkLoad(CleanupBulkLoadRequest) + returns (CleanupBulkLoadResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java new file mode 100644 index 0000000..aac020d --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class, ClientTests.class}) +public class TestRpcControllerFactory { + + public static class StaticRpcControllerFactory extends RpcControllerFactory { + + public StaticRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public HBaseRpcController newController() { + return new CountingRpcController(super.newController()); + } + + @Override + public HBaseRpcController newController(final CellScanner cellScanner) { + return new CountingRpcController(super.newController(cellScanner)); + } + + @Override + public HBaseRpcController newController(final List<CellScannable> cellIterables) { + return new CountingRpcController(super.newController(cellIterables)); + } + } + + public static class CountingRpcController extends DelegatingHBaseRpcController { + + private static AtomicInteger INT_PRIORITY = new AtomicInteger(); + private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); + + public CountingRpcController(HBaseRpcController delegate) { + super(delegate); + } + + @Override + public void setPriority(int priority) { + super.setPriority(priority); + INT_PRIORITY.incrementAndGet(); + } + + @Override + public void setPriority(TableName tn) { + super.setPriority(tn); + // ignore counts for system tables - it could change and we really only want to check on what + // the client should change + if (!tn.isSystemTable()) { + TABLE_PRIORITY.incrementAndGet(); + } + + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setup() throws Exception { + // load an endpoint so we have an endpoint to test - it doesn't matter which one, but + // this is already in tests, so we can just use it. + Configuration conf = UTIL.getConfiguration(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + + UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + /** + * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to + * cover all methods here and really is a bit brittle since we can always add new methods but + * won't be sure to add them here. So we just can cover the major ones. + * @throws Exception on failure + */ + @Test + public void testCountController() throws Exception { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // setup our custom controller + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + StaticRpcControllerFactory.class.getName()); + + TableName name = TableName.valueOf("testcustomcontroller"); + UTIL.createTable(name, fam1).close(); + + // change one of the connection properties so we get a new Connection with our configuration + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1); + + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(name); + byte[] row = Bytes.toBytes("row"); + Put p = new Put(row); + p.addColumn(fam1, fam1, Bytes.toBytes("val0")); + table.put(p); + + Integer counter = 1; + counter = verifyCount(counter); + + Delete d = new Delete(row); + d.addColumn(fam1, fam1); + table.delete(d); + counter = verifyCount(counter); + + Put p2 = new Put(row); + p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1")); + table.batch(Lists.newArrayList(p, p2), null); + // this only goes to a single server, so we don't need to change the count here + counter = verifyCount(counter); + + Append append = new Append(row); + append.add(fam1, fam1, Bytes.toBytes("val2")); + table.append(append); + counter = verifyCount(counter); + + // and check the major lookup calls as well + Get g = new Get(row); + table.get(g); + counter = verifyCount(counter); + + ResultScanner scan = table.getScanner(fam1); + scan.next(); + scan.close(); + counter = verifyCount(counter + 2); + + Get g2 = new Get(row); + table.get(Lists.newArrayList(g, g2)); + // same server, so same as above for not changing count + counter = verifyCount(counter); + + // make sure all the scanner types are covered + Scan scanInfo = new Scan(row); + // regular small + scanInfo.setSmall(true); + counter = doScan(table, scanInfo, counter); + + // reversed, small + scanInfo.setReversed(true); + counter = doScan(table, scanInfo, counter); + + // reversed, regular + scanInfo.setSmall(false); + counter = doScan(table, scanInfo, counter + 2); + + table.close(); + connection.close(); + } + + int doScan(Table table, Scan scan, int expectedCount) throws IOException { + ResultScanner results = table.getScanner(scan); + results.next(); + results.close(); + return verifyCount(expectedCount); + } + + int verifyCount(Integer counter) { + assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue()); + assertEquals(0, CountingRpcController.INT_PRIORITY.get()); + return CountingRpcController.TABLE_PRIORITY.get() + 1; + } + + @Test + public void testFallbackToDefaultRpcControllerFactory() { + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz"); + + // Should not fail + RpcControllerFactory factory = RpcControllerFactory.instantiate(conf); + assertNotNull(factory); + assertEquals(factory.getClass(), RpcControllerFactory.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java new file mode 100644 index 0000000..a9d10e8 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + + +/** + * The aggregation implementation at a region. + */ +public class ColumnAggregationEndpoint extends ColumnAggregationService +implements Coprocessor, CoprocessorService { + private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class); + private RegionCoprocessorEnvironment env = null; + + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + return; + } + throw new CoprocessorException("Must be loaded on a table region!"); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // Nothing to do. + } + + @Override + public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) { + // aggregate at each region + Scan scan = new Scan(); + // Family is required in pb. Qualifier is not. + byte [] family = request.getFamily().toByteArray(); + byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null; + if (request.hasQualifier()) { + scan.addColumn(family, qualifier); + } else { + scan.addFamily(family); + } + int sumResult = 0; + InternalScanner scanner = null; + try { + scanner = this.env.getRegion().getScanner(scan); + List<Cell> curVals = new ArrayList<Cell>(); + boolean hasMore = false; + do { + curVals.clear(); + hasMore = scanner.next(curVals); + for (Cell kv : curVals) { + if (CellUtil.matchingQualifier(kv, qualifier)) { + sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); + } + } + } while (hasMore); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + // Set result to -1 to indicate error. + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } + } + } + LOG.info("Returning result " + sumResult); + done.run(SumResponse.newBuilder().setSum(sumResult).build()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java new file mode 100644 index 0000000..22dac6d --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +/** + * Test coprocessor endpoint that always returns {@code null} for requests to the last region + * in the table. This allows tests to provide assurance of correct {@code null} handling for + * response values. + */ +public class ColumnAggregationEndpointNullResponse + extends + ColumnAggregationServiceNullResponse +implements Coprocessor, CoprocessorService { + private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class); + private RegionCoprocessorEnvironment env = null; + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + return; + } + throw new CoprocessorException("Must be loaded on a table region!"); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // Nothing to do. + } + + @Override + public void sum(RpcController controller, ColumnAggregationNullResponseSumRequest request, + RpcCallback<ColumnAggregationNullResponseSumResponse> done) { + // aggregate at each region + Scan scan = new Scan(); + // Family is required in pb. Qualifier is not. + byte[] family = request.getFamily().toByteArray(); + byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null; + if (request.hasQualifier()) { + scan.addColumn(family, qualifier); + } else { + scan.addFamily(family); + } + int sumResult = 0; + InternalScanner scanner = null; + try { + Region region = this.env.getRegion(); + // for the last region in the table, return null to test null handling + if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) { + done.run(null); + return; + } + scanner = region.getScanner(scan); + List<Cell> curVals = new ArrayList<Cell>(); + boolean hasMore = false; + do { + curVals.clear(); + hasMore = scanner.next(curVals); + for (Cell kv : curVals) { + if (CellUtil.matchingQualifier(kv, qualifier)) { + sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); + } + } + } while (hasMore); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + // Set result to -1 to indicate error. + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } + } + } + done.run(ColumnAggregationNullResponseSumResponse.newBuilder().setSum(sumResult) + .build()); + LOG.info("Returning sum " + sumResult + " for region " + + Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName())); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java new file mode 100644 index 0000000..c75fb31 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +/** + * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on + * the last region in the table. This allows tests to ensure correct error handling of + * coprocessor endpoints throwing exceptions. + */ +public class ColumnAggregationEndpointWithErrors + extends + ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors +implements Coprocessor, CoprocessorService { + private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class); + private RegionCoprocessorEnvironment env = null; + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + return; + } + throw new CoprocessorException("Must be loaded on a table region!"); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // Nothing to do. + } + + @Override + public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request, + RpcCallback<ColumnAggregationWithErrorsSumResponse> done) { + // aggregate at each region + Scan scan = new Scan(); + // Family is required in pb. Qualifier is not. + byte[] family = request.getFamily().toByteArray(); + byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null; + if (request.hasQualifier()) { + scan.addColumn(family, qualifier); + } else { + scan.addFamily(family); + } + int sumResult = 0; + InternalScanner scanner = null; + try { + Region region = this.env.getRegion(); + // throw an exception for requests to the last region in the table, to test error handling + if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) { + throw new DoNotRetryIOException("An expected exception"); + } + scanner = region.getScanner(scan); + List<Cell> curVals = new ArrayList<Cell>(); + boolean hasMore = false; + do { + curVals.clear(); + hasMore = scanner.next(curVals); + for (Cell kv : curVals) { + if (CellUtil.matchingQualifier(kv, qualifier)) { + sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); + } + } + } while (hasMore); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + // Set result to -1 to indicate error. + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + sumResult = -1; + LOG.info("Setting sum result to -1 to indicate error", e); + } + } + } + done.run(ColumnAggregationWithErrorsSumResponse.newBuilder().setSum(sumResult).build()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java new file mode 100644 index 0000000..5b7c1e9 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.util.Threads; + +import java.io.IOException; + +/** + * Test implementation of a coprocessor endpoint exposing the + * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests + * only. + */ +public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto + implements CoprocessorService, Coprocessor { + public ProtobufCoprocessorService() { + } + + @Override + public Service getService() { + return this; + } + + @Override + public void ping(RpcController controller, TestProtos.EmptyRequestProto request, + RpcCallback<TestProtos.EmptyResponseProto> done) { + done.run(TestProtos.EmptyResponseProto.getDefaultInstance()); + } + + @Override + public void echo(RpcController controller, TestProtos.EchoRequestProto request, + RpcCallback<TestProtos.EchoResponseProto> done) { + String message = request.getMessage(); + done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build()); + } + + @Override + public void error(RpcController controller, TestProtos.EmptyRequestProto request, + RpcCallback<TestProtos.EmptyResponseProto> done) { + CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception")); + done.run(null); + } + + @Override + public void pause(RpcController controller, PauseRequestProto request, + RpcCallback<EmptyResponseProto> done) { + Threads.sleepWithoutInterrupt(request.getMs()); + done.run(EmptyResponseProto.getDefaultInstance()); + } + + @Override + public void addr(RpcController controller, EmptyRequestProto request, + RpcCallback<AddrResponseProto> done) { + done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build()); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + // To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // To change body of implemented methods use File | Settings | File Templates. + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java new file mode 100644 index 0000000..c023437 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestBatchCoprocessorEndpoint.java @@ -0,0 +1,283 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +/** + * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint + */ +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestBatchCoprocessorEndpoint { + private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class); + + private static final TableName TEST_TABLE = + TableName.valueOf("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static byte[] ROW = Bytes.toBytes("testRow"); + + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 12; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + // set configure to indicate which cp should be loaded + Configuration conf = util.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(), + ProtobufCoprocessorService.class.getName(), + ColumnAggregationEndpointWithErrors.class.getName(), + ColumnAggregationEndpointNullResponse.class.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + util.startMiniCluster(2); + Admin admin = util.getHBaseAdmin(); + HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); + admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); + util.waitUntilAllRegionsAssigned(TEST_TABLE); + admin.close(); + + Table table = util.getConnection().getTable(TEST_TABLE); + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + table.close(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testAggregationNullResponse() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + ColumnAggregationNullResponseSumRequest.Builder builder = + ColumnAggregationNullResponseSumRequest + .newBuilder(); + builder.setFamily(ByteString.copyFrom(TEST_FAMILY)); + if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) { + builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER)); + } + Map<byte[], ColumnAggregationNullResponseSumResponse> results = + table.batchCoprocessorService( + ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"), + builder.build(), ROWS[0], ROWS[ROWS.length - 1], + ColumnAggregationNullResponseSumResponse.getDefaultInstance()); + + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e : + results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = 0; i < rowSeperator2; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + table.close(); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); + } + return ret; + } + + private Map<byte[], SumResponse> sum(final Table table, final byte[] family, + final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException, + Throwable { + ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest + .newBuilder(); + builder.setFamily(ByteString.copyFrom(family)); + if (qualifier != null && qualifier.length > 0) { + builder.setQualifier(ByteString.copyFrom(qualifier)); + } + return table.batchCoprocessorService( + ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"), + builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance()); + } + + @Test + public void testAggregationWithReturnValue() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], + ROWS[ROWS.length - 1]); + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = 0; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + + results.clear(); + + // scan: for region 2 and region 3 + results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], + ROWS[ROWS.length - 1]); + sumResult = 0; + expectedResult = 0; + for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = rowSeperator1; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + table.close(); + } + + @Test + public void testAggregation() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, + ROWS[0], ROWS[ROWS.length - 1]); + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = 0; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + + // scan: for region 2 and region 3 + results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]); + sumResult = 0; + expectedResult = 0; + for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = rowSeperator1; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + table.close(); + } + + @Test + public void testAggregationWithErrors() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + final Map<byte[], ColumnAggregationWithErrorsSumResponse> results = + Collections.synchronizedMap( + new TreeMap<byte[], ColumnAggregationWithErrorsSumResponse>( + Bytes.BYTES_COMPARATOR + )); + ColumnAggregationWithErrorsSumRequest.Builder builder = + ColumnAggregationWithErrorsSumRequest + .newBuilder(); + builder.setFamily(ByteString.copyFrom(TEST_FAMILY)); + if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) { + builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER)); + } + + boolean hasError = false; + try { + table.batchCoprocessorService( + ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor() + .findMethodByName("sum"), + builder.build(), ROWS[0], ROWS[ROWS.length - 1], + ColumnAggregationWithErrorsSumResponse.getDefaultInstance(), + new Batch.Callback<ColumnAggregationWithErrorsSumResponse>() { + + @Override + public void update(byte[] region, byte[] row, + ColumnAggregationWithErrorsSumResponse result) { + results.put(region, result); + } + }); + } catch (Throwable t) { + LOG.info("Exceptions in coprocessor service", t); + hasError = true; + } + + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) { + LOG.info("Got value " + e.getValue().getSum() + " for region " + + Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue().getSum(); + } + for (int i = 0; i < rowSeperator2; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + assertTrue(hasError); + table.close(); + } +}
