Repository: hbase Updated Branches: refs/heads/master b1ef8dd43 -> 40cc666ac
HBASE-17915 Implement async replication admin methods Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/40cc666a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/40cc666a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/40cc666a Branch: refs/heads/master Commit: 40cc666ac984e846a8c7105b771ce6bec90c4ad3 Parents: b1ef8dd Author: Guanghao Zhang <zg...@apache.org> Authored: Thu Apr 20 18:13:03 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Thu Apr 20 18:13:03 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncAdmin.java | 92 ++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 249 +++++++++-- .../apache/hadoop/hbase/client/HBaseAdmin.java | 58 +-- .../replication/ReplicationSerDeHelper.java | 67 +++ .../hadoop/hbase/client/TestAsyncAdminBase.java | 2 +- .../client/TestAsyncReplicationAdminApi.java | 416 +++++++++++++++++++ 6 files changed, 802 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 270f28f..5d2955f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; import java.util.List; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -30,6 +32,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.client.replication.TableCFs; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Pair; /** @@ -481,4 +486,91 @@ public interface AsyncAdmin { * @return the QuotaSetting list, which wrapped by a CompletableFuture. */ CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter); + + /** + * Add a new replication peer for replicating data to slave cluster + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + */ + CompletableFuture<Void> addReplicationPeer(final String peerId, + final ReplicationPeerConfig peerConfig); + + /** + * Remove a peer and stop the replication + * @param peerId a short name that identifies the peer + */ + CompletableFuture<Void> removeReplicationPeer(final String peerId); + + /** + * Restart the replication stream to the specified peer + * @param peerId a short name that identifies the peer + */ + CompletableFuture<Void> enableReplicationPeer(final String peerId); + + /** + * Stop the replication stream to the specified peer + * @param peerId a short name that identifies the peer + */ + CompletableFuture<Void> disableReplicationPeer(final String peerId); + + /** + * Returns the configured ReplicationPeerConfig for the specified peer + * @param peerId a short name that identifies the peer + * @return ReplicationPeerConfig for the peer wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(final String peerId); + + /** + * Update the peerConfig for the specified peer + * @param peerId a short name that identifies the peer + * @param peerConfig new config for the peer + */ + CompletableFuture<Void> updateReplicationPeerConfig(final String peerId, + final ReplicationPeerConfig peerConfig); + + /** + * Append the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + * @param tableCfs A map from tableName to column family names + */ + CompletableFuture<Void> appendReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs); + + /** + * Remove some table-cfs from config of the specified peer + * @param id a short name that identifies the cluster + * @param tableCfs A map from tableName to column family names + */ + CompletableFuture<Void> removeReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs); + + /** + * Return a list of replication peers. + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(); + + /** + * Return a list of replication peers. + * @param regex The regular expression to match peer id + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(String regex); + + /** + * Return a list of replication peers. + * @param pattern The compiled regular expression to match peer id + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern); + + /** + * Find all table and column families that are replicated from this cluster + * @return the replicated table-cfs list of this cluster. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<List<TableCFs>> listReplicatedTableCFs(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 180cd19..eae4089 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -22,16 +22,21 @@ import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -54,11 +59,16 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; import org.apache.hadoop.hbase.client.Scan.ReadType; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.quotas.QuotaTableUtil; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -121,6 +131,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; @@ -1155,42 +1179,209 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> setQuota(QuotaSettings quota){ - return this.<Void> newMasterCaller() - .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call( - controller, stub, QuotaSettings.buildSetQuotaRequestProto(quota), - (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) - .call(); + public CompletableFuture<Void> setQuota(QuotaSettings quota) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, + stub, QuotaSettings.buildSetQuotaRequestProto(quota), + (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); } @Override public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); Scan scan = QuotaTableUtil.makeScan(filter); - this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, - new RawScanResultConsumer() { - List<QuotaSettings> settings = new ArrayList<>(); - - @Override - public void onNext(Result[] results, ScanController controller) { - for (Result result : results) { - try { - QuotaTableUtil.parseResultToCollection(result, settings); - } catch (IOException e) { - controller.terminate(); - future.completeExceptionally(e); + this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() + .scan(scan, new RawScanResultConsumer() { + List<QuotaSettings> settings = new ArrayList<>(); + + @Override + public void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + try { + QuotaTableUtil.parseResultToCollection(result, settings); + } catch (IOException e) { + controller.terminate(); + future.completeExceptionally(e); + } } } - } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(settings); + } + }); + return future; + } + + public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub, + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req, + done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> removeReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller, + stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), + (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> enableReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller, + stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> disableReplicationPeer(String peerId) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call( + controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, + c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) + .call(); + } + + public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { + return this + .<ReplicationPeerConfig> newMasterCaller() + .action( + (controller, stub) -> this + .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( + controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( + s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), + (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call(); + } + + @Override + public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, + ReplicationPeerConfig peerConfig) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this + .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call( + controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, + peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( + resp) -> null)).call(); + } + + @Override + public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture<Void> future = new CompletableFuture<Void>(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture<Void> future = new CompletableFuture<Void>(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + try { + ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; } + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { + return listReplicationPeers((Pattern) null); + } - @Override - public void onComplete() { - future.complete(settings); + @Override + public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(String regex) { + return listReplicationPeers(Pattern.compile(regex)); + } + + @Override + public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { + return this + .<List<ReplicationPeerDescription>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call( + controller, + stub, + RequestConverter.buildListReplicationPeersRequest(pattern), + (s, c, req, done) -> s.listReplicationPeers(c, req, done), + (resp) -> resp.getPeerDescList().stream() + .map(ReplicationSerDeHelper::toReplicationPeerDescription) + .collect(Collectors.toList()))).call(); + } + + @Override + public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { + CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); + listTables().whenComplete( + (tables, error) -> { + if (!completeExceptionally(future, error)) { + List<TableCFs> replicatedTableCFs = new ArrayList<>(); + Arrays.asList(tables).forEach( + table -> { + Map<String, Integer> cfs = new HashMap<>(); + Arrays.asList(table.getColumnFamilies()).stream() + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); + if (!cfs.isEmpty()) { + replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); + } + }); + future.complete(replicatedTableCFs); } }); return future; @@ -1470,4 +1661,12 @@ public class AsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(error); return future; } + + private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { + if (error != null) { + future.completeExceptionally(error); + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index cadd6cc..8063070 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -3869,31 +3869,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - peerConfig.setTableCFsMap(tableCfs); - } else { - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection<String> appendCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List<String> cfs = preTableCfs.get(table); - if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - Set<String> cfSet = new HashSet<String>(cfs); - cfSet.addAll(appendCfs); - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else { - if (appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - preTableCfs.put(table, Lists.newArrayList(appendCfs)); - } - } - } - } + ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig); } @@ -3905,37 +3881,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - throw new ReplicationException("Table-Cfs for peer" + id + " is null"); - } - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - - TableName table = entry.getKey(); - Collection<String> removeCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List<String> cfs = preTableCfs.get(table); - if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { - preTableCfs.remove(table); - } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { - Set<String> cfSet = new HashSet<String>(cfs); - cfSet.removeAll(removeCfs); - if (cfSet.isEmpty()) { - preTableCfs.remove(table); - } else { - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove cf of table: " + table - + " which doesn't specify cfs from table-cfs config in peer: " + id); - } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove table: " + table - + " which has specified cfs from table-cfs config in peer: " + id); - } - } else { - throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); - } - } + ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); updateReplicationPeerConfig(id, peerConfig); } http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 2d5539c..f561f4a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -30,11 +30,14 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.Collection; import java.util.HashSet; @@ -367,4 +370,68 @@ public final class ReplicationSerDeHelper { builder.setConfig(convert(desc.getPeerConfig())); return builder.build(); } + + public static void appendTableCFsToReplicationPeerConfig( + Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) { + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + peerConfig.setTableCFsMap(tableCfs); + } else { + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection<String> appendCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.addAll(appendCfs); + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + preTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + } + } + + public static void removeTableCFsFromReplicationPeerConfig( + Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig, + String id) throws ReplicationException { + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); + } + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection<String> removeCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + preTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + preTableCfs.remove(table); + } else { + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java index f0dee0a..1881d4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java @@ -34,7 +34,7 @@ import org.junit.BeforeClass; */ public abstract class TestAsyncAdminBase { - protected static final Log LOG = LogFactory.getLog(TestAdmin1.class); + protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static byte[] FAMILY = Bytes.toBytes("testFamily"); protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0"); http://git-wip-us.apache.org/repos/asf/hbase/blob/40cc666a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java new file mode 100644 index 0000000..c850c38 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class to test asynchronous replication admin operations. + */ +@Category({MediumTests.class, ClientTests.class}) +public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { + + private final String ID_ONE = "1"; + private final String KEY_ONE = "127.0.0.1:2181:/hbase"; + private final String ID_SECOND = "2"; + private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.startMiniCluster(); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @Test + public void testAddRemovePeer() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + // try adding the same (fails) + try { + admin.addReplicationPeer(ID_ONE, rpc1).join(); + fail("Test case should fail as adding a same peer."); + } catch (CompletionException e) { + // OK! + } + assertEquals(1, admin.listReplicationPeers().get().size()); + // Try to remove an inexisting peer + try { + admin.removeReplicationPeer(ID_SECOND).join(); + fail("Test case should fail as removing a inexisting peer."); + } catch (CompletionException e) { + // OK! + } + assertEquals(1, admin.listReplicationPeers().get().size()); + // Add a second since multi-slave is supported + admin.addReplicationPeer(ID_SECOND, rpc2).join(); + assertEquals(2, admin.listReplicationPeers().get().size()); + // Remove the first peer we added + admin.removeReplicationPeer(ID_ONE).join(); + assertEquals(1, admin.listReplicationPeers().get().size()); + admin.removeReplicationPeer(ID_SECOND).join(); + assertEquals(0, admin.listReplicationPeers().get().size()); + } + + @Test + public void testPeerConfig() throws Exception { + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(KEY_ONE); + config.getConfiguration().put("key1", "value1"); + config.getConfiguration().put("key2", "value2"); + admin.addReplicationPeer(ID_ONE, config).join(); + + List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + ReplicationPeerDescription peerOne = peers.get(0); + assertNotNull(peerOne); + assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); + assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testEnableDisablePeer() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc1).join(); + List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + assertTrue(peers.get(0).isEnabled()); + + admin.disableReplicationPeer(ID_ONE).join(); + peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + assertFalse(peers.get(0).isEnabled()); + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testAppendPeerTableCFs() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); + final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); + final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); + final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); + + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + + Map<TableName, List<String>> tableCFs = new HashMap<>(); + + // append table t1 to replication + tableCFs.put(tableName1, null); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() + .getTableCFsMap(); + assertEquals(1, result.size()); + assertEquals(true, result.containsKey(tableName1)); + assertNull(result.get(tableName1)); + + // append table t2 to replication + tableCFs.clear(); + tableCFs.put(tableName2, null); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + + // append table column family: f1 of t3 to replication + tableCFs.clear(); + tableCFs.put(tableName3, new ArrayList<>()); + tableCFs.get(tableName3).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(3, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertTrue("Should contain t3", result.containsKey(tableName3)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + assertEquals(1, result.get(tableName3).size()); + assertEquals("f1", result.get(tableName3).get(0)); + + // append table column family: f1,f2 of t4 to replication + tableCFs.clear(); + tableCFs.put(tableName4, new ArrayList<>()); + tableCFs.get(tableName4).add("f1"); + tableCFs.get(tableName4).add("f2"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(4, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertTrue("Should contain t3", result.containsKey(tableName3)); + assertTrue("Should contain t4", result.containsKey(tableName4)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + assertEquals(1, result.get(tableName3).size()); + assertEquals("f1", result.get(tableName3).get(0)); + assertEquals(2, result.get(tableName4).size()); + assertEquals("f1", result.get(tableName4).get(0)); + assertEquals("f2", result.get(tableName4).get(1)); + + // append "table5" => [], then append "table5" => ["f1"] + tableCFs.clear(); + tableCFs.put(tableName5, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + tableCFs.clear(); + tableCFs.put(tableName5, new ArrayList<>()); + tableCFs.get(tableName5).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(5, result.size()); + assertTrue("Should contain t5", result.containsKey(tableName5)); + // null means replication all cfs of tab5 + assertNull(result.get(tableName5)); + + // append "table6" => ["f1"], then append "table6" => [] + tableCFs.clear(); + tableCFs.put(tableName6, new ArrayList<>()); + tableCFs.get(tableName6).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + tableCFs.clear(); + tableCFs.put(tableName6, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(6, result.size()); + assertTrue("Should contain t6", result.containsKey(tableName6)); + // null means replication all cfs of tab6 + assertNull(result.get(tableName6)); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testRemovePeerTableCFs() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); + final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + Map<TableName, List<String>> tableCFs = new HashMap<>(); + try { + tableCFs.put(tableName3, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + tableCFs.clear(); + tableCFs.put(tableName1, null); + tableCFs.put(tableName2, new ArrayList<>()); + tableCFs.get(tableName2).add("cf1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + try { + tableCFs.clear(); + tableCFs.put(tableName3, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() + .getTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertNull(result.get(tableName1)); + assertEquals(1, result.get(tableName2).size()); + assertEquals("cf1", result.get(tableName2).get(0)); + + try { + tableCFs.clear(); + tableCFs.put(tableName1, new ArrayList<>()); + tableCFs.get(tableName1).add("cf1"); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail, because table t1 didn't specify cfs in peer config"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + tableCFs.clear(); + tableCFs.put(tableName1, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(1, result.size()); + assertEquals(1, result.get(tableName2).size()); + assertEquals("cf1", result.get(tableName2).get(0)); + + try { + tableCFs.clear(); + tableCFs.put(tableName2, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail, because table t2 hase specified cfs in peer config"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + tableCFs.clear(); + tableCFs.put(tableName2, new ArrayList<>()); + tableCFs.get(tableName2).add("cf1"); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + tableCFs.clear(); + tableCFs.put(tableName4, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + admin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testSetPeerNamespaces() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc).join(); + + // add ns1 and ns2 to peer config + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Set<String> namespaces = new HashSet<>(); + namespaces.add(ns1); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); + assertEquals(2, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + assertTrue(namespaces.contains(ns2)); + + // update peer config only contains ns1 + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + namespaces.clear(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); + assertEquals(1, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testNamespacesAndTableCfsConfigConflict() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); + final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc).join(); + + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Set<String> namespaces = new HashSet<String>(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Map<TableName, List<String>> tableCfs = new HashMap<>(); + tableCfs.put(tableName1, new ArrayList<>()); + rpc.setTableCFsMap(tableCfs); + try { + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); + } catch (CompletionException e) { + // OK + } + + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + tableCfs.clear(); + tableCfs.put(tableName2, new ArrayList<>()); + rpc.setTableCFsMap(tableCfs); + admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + namespaces.clear(); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + try { + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); + } catch (CompletionException e) { + // OK + } + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testPeerBandwidth() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + + admin.addReplicationPeer(ID_ONE, rpc).join(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + assertEquals(0, rpc.getBandwidth()); + + rpc.setBandwidth(2097152); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); + + admin.removeReplicationPeer(ID_ONE).join(); + } +}