http://git-wip-us.apache.org/repos/asf/hbase/blob/af36bfb2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java new file mode 100644 index 0000000..fcfdf93 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -0,0 +1,2278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.AsyncMetaTableAccessor; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.client.Scan.ReadType; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.TableCFs; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.quotas.QuotaTableUtil; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.Pair; + +/** + * The implementation of AsyncAdmin. + */ [email protected] +public class RawAsyncHBaseAdmin implements AsyncAdmin { + public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + + private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class); + + private final AsyncConnectionImpl connection; + + private final RawAsyncTable metaTable; + + private final long rpcTimeoutNs; + + private final long operationTimeoutNs; + + private final long pauseNs; + + private final int maxAttempts; + + private final int startLogErrorsCnt; + + private final NonceGenerator ng; + + RawAsyncHBaseAdmin(AsyncConnectionImpl connection) { + this.connection = connection; + this.metaTable = connection.getRawTable(META_TABLE_NAME); + this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs(); + this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs(); + this.pauseNs = connection.connConf.getPauseNs(); + this.maxAttempts = connection.connConf.getMaxRetries(); + this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt(); + this.ng = connection.getNonceGenerator(); + } + + private <T> MasterRequestCallerBuilder<T> newMasterCaller() { + return this.connection.callerFactory.<T> masterRequest() + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); + } + + private <T> AdminRequestCallerBuilder<T> newAdminCaller() { + return this.connection.callerFactory.<T> adminRequest() + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); + } + + @FunctionalInterface + private interface MasterRpcCall<RESP, REQ> { + void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback<RESP> done); + } + + @FunctionalInterface + private interface AdminRpcCall<RESP, REQ> { + void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback<RESP> done); + } + + @FunctionalInterface + private interface Converter<D, S> { + D convert(S src) throws IOException; + } + + private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, + MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, + Converter<RESP, PRESP> respConverter) { + CompletableFuture<RESP> future = new CompletableFuture<>(); + rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, + AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, + Converter<RESP, PRESP> respConverter) { + + CompletableFuture<RESP> future = new CompletableFuture<>(); + rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(new IOException(controller.errorText())); + } else { + try { + future.complete(respConverter.convert(resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, + MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, + ProcedureBiConsumer consumer) { + CompletableFuture<Long> procFuture = this + .<Long> newMasterCaller() + .action( + (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, + respConverter)).call(); + return waitProcedureResult(procFuture).whenComplete(consumer); + } + + @FunctionalInterface + private interface TableOperator { + CompletableFuture<Void> operate(TableName table); + } + + private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern, + TableOperator operator, String operationType) { + CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>(); + List<TableDescriptor> failed = new LinkedList<>(); + listTables(Optional.ofNullable(pattern), false).whenComplete( + (tables, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + CompletableFuture[] futures = + tables.stream() + .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> { + if (ex != null) { + LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex); + failed.add(table); + } + })).<CompletableFuture> toArray(size -> new CompletableFuture[size]); + CompletableFuture.allOf(futures).thenAccept((v) -> { + future.complete(failed); + }); + }); + return future; + } + + @Override + public CompletableFuture<Boolean> tableExists(TableName tableName) { + return AsyncMetaTableAccessor.tableExists(metaTable, tableName); + } + + @Override + public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern, + boolean includeSysTables) { + return this.<List<TableDescriptor>> newMasterCaller() + .action((controller, stub) -> this + .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( + controller, stub, + RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables), + (s, c, req, done) -> s.getTableDescriptors(c, req, done), + (resp) -> ProtobufUtil.toTableDescriptorList(resp))) + .call(); + } + + @Override + public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern, + boolean includeSysTables) { + return this.<List<TableName>> newMasterCaller() + .action((controller, stub) -> this + .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub, + RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables), + (s, c, req, done) -> s.getTableNames(c, req, done), + (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) + .call(); + } + + @Override + public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) { + CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); + this.<List<TableSchema>> newMasterCaller() + .action( + (controller, stub) -> this + .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( + controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s, + c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp + .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!tableSchemas.isEmpty()) { + future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0))); + } else { + future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, + int numRegions) { + try { + return createTable(desc, Optional.of(getSplitKeys(startKey, endKey, numRegions))); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + } + + @Override + public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) { + if (desc.getTableName() == null) { + return failedFuture(new IllegalArgumentException("TableName cannot be null")); + } + try { + splitKeys.ifPresent(keys -> verifySplitKeys(keys)); + return this.<CreateTableRequest, CreateTableResponse> procedureCall(RequestConverter + .buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()), (s, c, req, + done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), + new CreateTableProcedureBiConsumer(this, desc.getTableName())); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + } + + @Override + public CompletableFuture<Void> deleteTable(TableName tableName) { + return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter + .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), + new DeleteTableProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) { + return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE"); + } + + @Override + public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { + return this.<TruncateTableRequest, TruncateTableResponse> procedureCall( + RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), + (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<Void> enableTable(TableName tableName) { + return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter + .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), + new EnableTableProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) { + return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE"); + } + + @Override + public CompletableFuture<Void> disableTable(TableName tableName) { + return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter + .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), + new DisableTableProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) { + return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE"); + } + + @Override + public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (state.isPresent()) { + future.complete(state.get().inStates(TableState.State.ENABLED)); + } else { + future.completeExceptionally(new TableNotFoundException(tableName)); + } + }); + return future; + } + + @Override + public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (state.isPresent()) { + future.complete(state.get().inStates(TableState.State.DISABLED)); + } else { + future.completeExceptionally(new TableNotFoundException(tableName)); + } + }); + return future; + } + + @Override + public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { + return isTableAvailable(tableName, null); + } + + @Override + public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + isTableEnabled(tableName).whenComplete( + (enabled, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!enabled) { + future.complete(false); + } else { + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)) + .whenComplete( + (locations, error1) -> { + if (error1 != null) { + future.completeExceptionally(error1); + return; + } + int notDeployed = 0; + int regionCount = 0; + for (HRegionLocation location : locations) { + HRegionInfo info = location.getRegionInfo(); + if (location.getServerName() == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Table " + tableName + " has not deployed region " + + info.getEncodedName()); + } + notDeployed++; + } else if (splitKeys != null + && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { + for (byte[] splitKey : splitKeys) { + // Just check if the splitkey is available + if (Bytes.equals(info.getStartKey(), splitKey)) { + regionCount++; + break; + } + } + } else { + // Always empty start row should be counted + regionCount++; + } + } + if (notDeployed > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Table " + tableName + " has " + notDeployed + " regions"); + } + future.complete(false); + } else if (splitKeys != null && regionCount != splitKeys.length + 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Table " + tableName + " expected to have " + + (splitKeys.length + 1) + " regions, but only " + regionCount + + " available"); + } + future.complete(false); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Table " + tableName + " should be available"); + } + future.complete(true); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) { + return this + .<Pair<Integer, Integer>>newMasterCaller() + .action( + (controller, stub) -> this + .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call( + controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s, + c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>( + resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call(); + } + + @Override + public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { + return this.<AddColumnRequest, AddColumnResponse> procedureCall( + RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), + new AddColumnFamilyProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { + return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall( + RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), + (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<Void> modifyColumnFamily(TableName tableName, + ColumnFamilyDescriptor columnFamily) { + return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall( + RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), + (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { + return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( + RequestConverter.buildCreateNamespaceRequest(descriptor), + (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), + new CreateNamespaceProcedureBiConsumer(this, descriptor.getName())); + } + + @Override + public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { + return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( + RequestConverter.buildModifyNamespaceRequest(descriptor), + (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), + new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName())); + } + + @Override + public CompletableFuture<Void> deleteNamespace(String name) { + return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( + RequestConverter.buildDeleteNamespaceRequest(name), + (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), + new DeleteNamespaceProcedureBiConsumer(this, name)); + } + + @Override + public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { + return this + .<NamespaceDescriptor> newMasterCaller() + .action( + (controller, stub) -> this + .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call( + controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c, + req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil + .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); + } + + @Override + public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { + return this + .<List<NamespaceDescriptor>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call( + controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, + done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil + .toNamespaceDescriptorList(resp))).call(); + } + + @Override + public CompletableFuture<Boolean> setBalancerOn(final boolean on) { + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this + .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, + stub, RequestConverter.buildSetBalancerRunningRequest(on, true), + (s, c, req, done) -> s.setBalancerRunning(c, req, done), + (resp) -> resp.getPrevBalanceValue())).call(); + } + + @Override + public CompletableFuture<Boolean> balance(boolean forcible) { + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, + stub, RequestConverter.buildBalanceRequest(forcible), + (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); + } + + @Override + public CompletableFuture<Boolean> isBalancerOn() { + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call( + controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), + (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) + .call(); + } + + @Override + public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete((location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName(); + if (server == null) { + future.completeExceptionally(new NotServingRegionException(regionName)); + } else { + closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + } + }); + return future; + } + + private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) { + return this + .<Boolean> newAdminCaller() + .action( + (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall( + controller, stub, + ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()), + (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed())) + .serverName(serverName).call(); + } + + @Override + public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) { + return this.<List<HRegionInfo>> newAdminCaller() + .action((controller, stub) -> this + .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall( + controller, stub, RequestConverter.buildGetOnlineRegionRequest(), + (s, c, req, done) -> s.getOnlineRegion(c, req, done), + resp -> ProtobufUtil.getRegionInfos(resp))) + .serverName(sn).call(); + } + + @Override + public CompletableFuture<Void> flush(TableName tableName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + tableExists(tableName).whenComplete((exists, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (!exists) { + future.completeExceptionally(new TableNotFoundException(tableName)); + } else { + isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!tableEnabled) { + future.completeExceptionally(new TableNotEnabledException(tableName)); + } else { + execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), + new HashMap<>()).whenComplete((ret, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(ret); + } + }); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> flushRegion(byte[] regionName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } + + HRegionInfo regionInfo = location.getRegionInfo(); + this.<Void> newAdminCaller() + .serverName(serverName) + .action( + (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( + controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), + resp -> null)).call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) { + return compact(tableName, columnFamily, false, CompactType.NORMAL); + } + + @Override + public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) { + return compactRegion(regionName, columnFamily, false); + } + + @Override + public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) { + return compact(tableName, columnFamily, true, CompactType.NORMAL); + } + + @Override + public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) { + return compactRegion(regionName, columnFamily, true); + } + + @Override + public CompletableFuture<Void> compactRegionServer(ServerName sn) { + return compactRegionServer(sn, false); + } + + @Override + public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { + return compactRegionServer(sn, true); + } + + private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); + if (hRegionInfos != null) { + hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty()))); + } + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily, + boolean major) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } + compact(location.getServerName(), location.getRegionInfo(), major, columnFamily) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + /** + * List all region locations for the specific table. + */ + private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { + if (TableName.META_TABLE_NAME.equals(tableName)) { + CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); + // For meta table, we use zk to fetch all locations. + AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); + registry.getMetaRegionLocation().whenComplete( + (metaRegions, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (metaRegions == null || metaRegions.isEmpty() + || metaRegions.getDefaultRegionLocation() == null) { + future.completeExceptionally(new IOException("meta region does not found")); + } else { + future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); + } + // close the registry. + IOUtils.closeQuietly(registry); + }); + return future; + } else { + // For non-meta table, we fetch all locations by scanning hbase:meta table + return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)); + } + } + + /** + * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() + */ + private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily, + final boolean major, CompactType compactType) { + if (CompactType.MOB.equals(compactType)) { + // TODO support MOB compact. + return failedFuture(new UnsupportedOperationException("MOB compact does not support")); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + getTableHRegionLocations(tableName).whenComplete((locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); + for (HRegionLocation location : locations) { + if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue; + if (location.getServerName() == null) continue; + compactFutures + .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)); + } + // future complete unless all of the compact futures are completed. + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + /** + * Compact the region at specific region server. + */ + private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri, + final boolean major, Optional<byte[]> columnFamily) { + return this + .<Void> newAdminCaller() + .serverName(sn) + .action( + (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( + controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), + major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), + resp -> null)).call(); + } + + private byte[] toEncodeRegionName(byte[] regionName) { + try { + return HRegionInfo.isEncodedRegionName(regionName) ? regionName + : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName)); + } catch (IOException e) { + return regionName; + } + } + + private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, + CompletableFuture<TableName> result) { + getRegionLocation(encodeRegionName).whenComplete( + (location, err) -> { + if (err != null) { + result.completeExceptionally(err); + return; + } + HRegionInfo regionInfo = location.getRegionInfo(); + if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + result.completeExceptionally(new IllegalArgumentException( + "Can't invoke merge on non-default regions directly")); + return; + } + if (!tableName.compareAndSet(null, regionInfo.getTable())) { + if (!tableName.get().equals(regionInfo.getTable())) { + // tables of this two region should be same. + result.completeExceptionally(new IllegalArgumentException( + "Cannot merge regions from two different tables " + tableName.get() + " and " + + regionInfo.getTable())); + } else { + result.complete(tableName.get()); + } + } + }); + } + + private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA, + byte[] encodeRegionNameB) { + AtomicReference<TableName> tableNameRef = new AtomicReference<>(); + CompletableFuture<TableName> future = new CompletableFuture<>(); + + checkAndGetTableName(encodeRegionNameA, tableNameRef, future); + checkAndGetTableName(encodeRegionNameB, tableNameRef, future); + return future; + } + + @Override + public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB, + boolean forcible) { + CompletableFuture<Void> future = new CompletableFuture<>(); + final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA); + final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB); + + checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB) + .whenComplete((tableName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + + MergeTableRegionsRequest request = null; + try { + request = RequestConverter.buildMergeTableRegionsRequest( + new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), + ng.newNonce()); + } catch (DeserializationException e) { + future.completeExceptionally(e); + return; + } + + this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request, + (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), + new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + + }); + return future; + } + + @Override + public CompletableFuture<Void> split(TableName tableName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + tableExists(tableName).whenComplete((exist, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException(tableName)); + return; + } + metaTable + .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) + .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) + .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))) + .whenComplete((results, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (results != null && !results.isEmpty()) { + List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); + for (Result r : results) { + if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue; + RegionLocations rl = MetaTableAccessor.getRegionLocations(r); + if (rl != null) { + for (HRegionLocation h : rl.getRegionLocations()) { + if (h != null && h.getServerName() != null) { + HRegionInfo hri = h.getRegionInfo(); + if (hri == null || hri.isSplitParent() + || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) + continue; + splitFutures.add(split(h.getServerName(), hri, Optional.empty())); + } + } + } + } + CompletableFuture + .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])) + .whenComplete((ret, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + return; + } + future.complete(ret); + }); + } else { + future.complete(null); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { + CompletableFuture<Void> result = new CompletableFuture<>(); + if (splitPoint == null) { + return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); + } + connection.getRegionLocator(tableName).getRegionLocation(splitPoint) + .whenComplete((loc, err) -> { + if (err != null) { + result.completeExceptionally(err); + } else if (loc == null || loc.getRegionInfo() == null) { + result.completeExceptionally(new IllegalArgumentException( + "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); + } else { + splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint)) + .whenComplete((ret, err2) -> { + if (err2 != null) { + result.completeExceptionally(err2); + } else { + result.complete(ret); + } + + }); + } + }); + return result; + } + + @Override + public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + HRegionInfo regionInfo = location.getRegionInfo(); + if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + future.completeExceptionally(new IllegalArgumentException( + "Can't split replicas directly. " + + "Replicas are auto-split when their primary is split.")); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } + split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri, + Optional<byte[]> splitPoint) { + if (hri.getStartKey() != null && splitPoint.isPresent() + && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) { + return failedFuture(new IllegalArgumentException( + "should not give a splitkey which equals to startkey!")); + } + return this + .<Void> newAdminCaller() + .action( + (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall( + controller, stub, + ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint), + (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null)) + .serverName(sn).call(); + } + + @Override + public CompletableFuture<Void> assign(byte[] regionName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + this.<Void> newMasterCaller() + .action( + ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( + controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), + resp -> null))).call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + this.<Void> newMasterCaller() + .action( + ((controller, stub) -> this + .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, + RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), + (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call() + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> offline(byte[] regionName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + this.<Void> newMasterCaller() + .action( + ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( + controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done), + resp -> null))).call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + this.<Void> newMasterCaller() + .action( + (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call( + controller, stub, RequestConverter.buildMoveRegionRequest( + regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s + .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> setQuota(QuotaSettings quota) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, + stub, QuotaSettings.buildSetQuotaRequestProto(quota), + (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { + CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); + Scan scan = QuotaTableUtil.makeScan(filter); + this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() + .scan(scan, new RawScanResultConsumer() { + List<QuotaSettings> settings = new ArrayList<>(); + + @Override + public void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + try { + QuotaTableUtil.parseResultToCollection(result, settings); + } catch (IOException e) { + controller.terminate(); + future.completeExceptionally(e); + } + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(settings); + } + }); + return future; + } + + public CompletableFuture<Void> addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub, + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req, + done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> removeReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller, + stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), + (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> enableReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller, + stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> disableReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call( + controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, + c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) + .call(); + } + + public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { + return this + .<ReplicationPeerConfig> newMasterCaller() + .action( + (controller, stub) -> this + .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( + controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( + s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), + (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call(); + } + + @Override + public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, + ReplicationPeerConfig peerConfig) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call( + controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, + peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( + resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture<Void> future = new CompletableFuture<Void>(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture<Void> future = new CompletableFuture<Void>(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + try { + ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; + } + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) { + return this + .<List<ReplicationPeerDescription>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call( + controller, + stub, + RequestConverter.buildListReplicationPeersRequest(pattern), + (s, c, req, done) -> s.listReplicationPeers(c, req, done), + (resp) -> resp.getPeerDescList().stream() + .map(ReplicationSerDeHelper::toReplicationPeerDescription) + .collect(Collectors.toList()))).call(); + } + + @Override + public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { + CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); + listTables().whenComplete( + (tables, error) -> { + if (!completeExceptionally(future, error)) { + List<TableCFs> replicatedTableCFs = new ArrayList<>(); + tables.forEach(table -> { + Map<String, Integer> cfs = new HashMap<>(); + Stream.of(table.getColumnFamilies()) + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); + if (!cfs.isEmpty()) { + replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); + } + }); + future.complete(replicatedTableCFs); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { + SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil + .createHBaseProtosSnapshotDesc(snapshotDesc); + try { + ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); + this.<Long> newMasterCaller() + .action( + (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, + stub, request, (s, c, req, done) -> s.snapshot(c, req, done), + resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (done) { + future.complete(null); + } else { + // retry again after pauseTime. + long pauseTime = ConnectionUtils.getPauseTime( + TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER + .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); + } + } ); + } else { + future.completeExceptionally(new SnapshotCreationException("Snapshot '" + + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout + + " ms", snapshotDesc)); + } + } + }; + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + }); + return future; + } + + @Override + public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( + controller, + stub, + IsSnapshotDoneRequest.newBuilder() + .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, + req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); + } + + @Override + public CompletableFuture<Void> restoreSnapshot(String snapshotName) { + boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( + HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, + HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); + return restoreSnapshot(snapshotName, takeFailSafeSnapshot); + } + + @Override + public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) { + CompletableFuture<Void> future = new CompletableFuture<>(); + listSnapshots(Optional.of(Pattern.compile(snapshotName))).whenComplete( + (snapshotDescriptions, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TableName tableName = null; + if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { + for (SnapshotDescription snap : snapshotDescriptions) { + if (snap.getName().equals(snapshotName)) { + tableName = snap.getTableName(); + break; + } + } + } + if (tableName == null) { + future.completeExceptionally(new RestoreSnapshotException( + "Unable to find the table name for snapshot=" + snapshotName)); + return; + } + final TableName finalTableName = tableName; + tableExists(finalTableName) + .whenComplete((exists, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!exists) { + // if table does not exist, then just clone snapshot into new table. + completeConditionalOnFuture(future, + internalRestoreSnapshot(snapshotName, finalTableName)); + } else { + isTableDisabled(finalTableName).whenComplete( + (disabled, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else if (!disabled) { + future.completeExceptionally(new TableNotDisabledException(finalTableName)); + } else { + completeConditionalOnFuture(future, + restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); + } + }); + } + } ); + }); + return future; + } + + private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, + boolean takeFailSafeSnapshot) { + if (takeFailSafeSnapshot) { + CompletableFuture<Void> future = new CompletableFuture<>(); + // Step.1 Take a snapshot of the current state + String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get( + HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, + HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); + final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat + .replace("{snapshot.name}", snapshotName) + .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) + .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); + LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); + snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + // Step.2 Restore snapshot + internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> { + if (err2 != null) { + // Step.3.a Something went wrong during the restore and try to rollback. + internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete( + (void3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + + failSafeSnapshotSnapshotName + " succeeded."; + future.completeExceptionally(new RestoreSnapshotException(msg)); + } + }); + } else { + // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. + LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); + deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete( + (ret3, err3) -> { + if (err3 != null) { + LOG.error( + "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3); + future.completeExceptionally(err3); + } else { + future.complete(ret3); + } + }); + } + } ); + } + } ); + return future; + } else { + return internalRestoreSnapshot(snapshotName, tableName); + } + } + + private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, + CompletableFuture<T> parentFuture) { + parentFuture.whenComplete((res, err) -> { + if (err != null) { + dependentFuture.completeExceptionally(err); + } else { + dependentFuture.complete(res); + } + }); + } + + @Override + public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + tableExists(tableName).whenComplete((exists, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (exists) { + future.completeExceptionally(new TableExistsException(tableName)); + } else { + completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName)); + } + }); + return future; + } + + private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) { + SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() + .setName(snapshotName).setTable(tableName.getNameAsString()).build(); + try { + ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + return waitProcedureResult(this + .<Long> newMasterCaller() + .action( + (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call( + controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot) + .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req, + done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call()); + } + + @Override + public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) { + CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); + this.<GetCompletedSnapshotsResponse> newMasterCaller() + .action( + (controller, stub) -> this + .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, GetCompletedSnapshotsResponse> call( + controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req, + done) -> s.getCompletedSnapshots(c, req, done), resp -> resp)) + .call() + .whenComplete( + (resp, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + future.complete(resp + .getSnapshotsList() + .stream() + .map(ProtobufUtil::createSnapshotDesc) + .filter( + snap -> pattern.isPresent() ? pattern.get().matcher(snap.getName()).matches() + : true).collect(Collectors.toList())); + }); + return future; + } + + @Override + public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, + Pattern snapshotNamePattern) { + CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); + listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete( + (tableNames, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (tableNames == null || tableNames.size() <= 0) { + future.complete(Collections.emptyList()); + return; + } + listSnapshots(Optional.ofNullable(snapshotNamePattern)).whenComplete( + (snapshotDescList, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (snapshotDescList == null || snapshotDescList.isEmpty()) { + future.complete(Collections.emptyList()); + return; + } + future.complete(snapshotDescList.stream() + .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) + .collect(Collectors.toList())); + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> deleteSnapshot(String snapshotName) { + return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); + } + + @Override + public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { + return deleteTableSnapshots(null, snapshotNamePattern); + } + + @Override + public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, + Pattern snapshotNamePattern) { + CompletableFuture<Void> future = new CompletableFuture<>(); + listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete( + ((snapshotDescriptions, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { + future.complete(null); + return; + } + List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>(); + snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures + .add(internalDeleteSnapshot(snapDesc))); + CompletableFuture.allOf( + deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()])) + .thenAccept(v -> future.complete(v)); + })); + return future; + } + + private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( + controller, + stub, + DeleteSnapshotRequest.newBuilder() + .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, + req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); + } + + @Override + public CompletableFuture<Void> execProcedure(String signature, String instance, + Map<String, String> props) { + CompletableFuture<Void> future = new CompletableFuture<>(); + ProcedureDescription procDesc = + ProtobufUtil.buildProcedureDescription(signature, instance, props); + this.<Long> newMasterCaller() + .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( + controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), + (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) + .call().whenComplete((expectedTimeout, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (done) { + future.complete(null); + } else { + // retry again after pauseTime. + long pauseTime = ConnectionUtils + .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, + TimeUnit.MICROSECONDS); + } + }); + } else { + future.completeExceptionally(new IOException("Procedure '" + signature + " : " + + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); + } + } + }; + // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + }); + return future; + } + + @Override + public CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance, + Map<String, String> props) { + ProcedureDescription proDesc = + ProtobufUtil.buildProcedureDescription(signature, instance, props); + return this.<byte[]> newMasterCaller() + .action( + (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( + controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), + (s, c, req, done) -> s.execProcedureWithRet(c, req, done), + resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) + .call(); + } + + @Override + public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, + Map<String, String> props) { + ProcedureDescription proDesc = + ProtobufUtil.buildProcedureDescription(signature, instance, props); + return this.<Boolean> newMasterCaller() + .action((controller, stub) -> this + .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub, + IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), + (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) + .call(); + } + + @Override + public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { + return this.<Boolean> newMasterCaller().action( + (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( + controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), + (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) + .call(); + } + + @Override + public CompletableFuture<List<ProcedureInfo>> listProcedures() { + return this + .<List<ProcedureInfo>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListProceduresRequest, ListProceduresResponse, List<ProcedureInfo>> call( + controller, stub, ListProceduresRequest.newBuilder().build(), + (s, c, req, done) -> s.listProcedures(c, req, done), + resp -> resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo) + .collect(Collectors.toList()))).call(); + } + + /** + * Get the region location for the passed region name. The region name may be a full region name + * or encoded region name. If the region does not found, then it'll throw an + * UnknownRegionException wrapped by a {@link CompletableFuture} + * @param regionNameOrEncodedRegionName + * @return region location, wrapped by a {@link CompletableFuture} + */ + @VisibleForTesting + CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { + if (regionNameOrEncodedRegionName == null) { + return failedFuture(new IllegalArgumentException("Passed region name can't be null")); + } + try { + CompletableFuture<Optional<HRegionLocation>> future; + if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { + future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, + regionNameOrEncodedRegionName); + } else { + future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); + } + + CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); + future.whenComplete((location, err) -> { + if (err != null) { + returnedFuture.completeExceptionally(err); + return; + } + LOG.info("location is " + location); + if (!location.isPresent() || location.get().getRegionInfo() == null) { + LOG.info("unknown location is " + location); + returnedFuture.completeExceptionally(new UnknownRegionException( + "Invalid region name or encoded region name: " + + Bytes.toStringBinary(regionNameOrEncodedRegionName))); + } else { + returnedFuture.complete(location.get()); + } + }); + return returnedFuture; + } catch (IOException e) { + return failedFuture(e); + } + } + + /** + * Get the region info for the passed region name. The region name may be a full region name or + * encoded region name. If the region does not found, then it'll throw an UnknownRegionException + * wrapped by a {@link CompletableFuture} + * @param regionNameOrEncodedRegionName + * @return region info, wrapped by a {@link CompletableFuture} + */ + private CompletableFuture<HRegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { + if (regionNameOrEncodedRegionName == null) { + return failedFuture(new IllegalArgumentException("Passed region name can't be null")); + } + + if (Bytes.equals(regionNameOrEncodedRegionName, + HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionNameOrEncodedRegionName, + HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { + return
<TRUNCATED>
