HBASE-19009 implement modifyTable and enable/disableTableReplication for AsyncAdmin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d885e223 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d885e223 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d885e223 Branch: refs/heads/branch-2 Commit: d885e2232df6ac4c65b3a87eb45780b8fff60b91 Parents: fb79e9d Author: Guanghao Zhang <zg...@apache.org> Authored: Sun Nov 12 20:16:20 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Thu Nov 16 07:19:34 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncAdmin.java | 18 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 17 +- .../hbase/client/ColumnFamilyDescriptor.java | 27 ++ .../apache/hadoop/hbase/client/HBaseAdmin.java | 220 ++------- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 313 ++++++++++++- .../hadoop/hbase/client/TableDescriptor.java | 51 +- .../hbase/client/TableDescriptorBuilder.java | 21 +- .../client/replication/ReplicationAdmin.java | 8 +- .../replication/ReplicationPeerConfigUtil.java | 468 +++++++++++++++++++ .../replication/ReplicationSerDeHelper.java | 437 ----------------- .../replication/ReplicationPeerConfig.java | 20 + .../hbase/shaded/protobuf/RequestConverter.java | 6 +- .../replication/ReplicationPeerZKImpl.java | 6 +- .../replication/ReplicationPeersZKImpl.java | 14 +- .../hadoop/hbase/master/MasterRpcServices.java | 10 +- .../replication/master/TableCFsUpdater.java | 14 +- .../client/TestAsyncReplicationAdminApi.java | 2 - ...estAsyncReplicationAdminApiWithClusters.java | 242 ++++++++++ .../replication/TestReplicationAdmin.java | 16 +- .../replication/TestMasterReplication.java | 4 +- .../replication/TestPerTableCFReplication.java | 62 +-- .../replication/master/TestTableCFsUpdater.java | 27 +- 22 files changed, 1261 insertions(+), 742 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index f251a8f..722e8b5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -141,6 +141,12 @@ public interface AsyncAdmin { */ CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys); + /* + * Modify an existing table, more IRB friendly version. + * @param desc modified description of the table + */ + CompletableFuture<Void> modifyTable(TableDescriptor desc); + /** * Deletes a table. * @param tableName name of table to delete @@ -553,6 +559,18 @@ public interface AsyncAdmin { CompletableFuture<List<TableCFs>> listReplicatedTableCFs(); /** + * Enable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture<Void> enableTableReplication(TableName tableName); + + /** + * Disable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture<Void> disableTableReplication(TableName tableName); + + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique * based on <b>the name of the snapshot</b>. Attempts to take a snapshot with the same name (even http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 250a38c..5a20291 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -128,6 +128,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> modifyTable(TableDescriptor desc) { + return wrap(rawAdmin.modifyTable(desc)); + } + + @Override public CompletableFuture<Void> deleteTable(TableName tableName) { return wrap(rawAdmin.deleteTable(tableName)); } @@ -420,6 +425,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> enableTableReplication(TableName tableName) { + return wrap(rawAdmin.enableTableReplication(tableName)); + } + + @Override + public CompletableFuture<Void> disableTableReplication(TableName tableName) { + return wrap(rawAdmin.disableTableReplication(tableName)); + } + + @Override public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); } @@ -709,4 +724,4 @@ public class AsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { return wrap(rawAdmin.clearDeadServers(servers)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java index c232271..03f4582 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.client; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.yetus.audience.InterfaceAudience; @@ -54,6 +56,31 @@ public interface ColumnFamilyDescriptor { return lhs.getConfiguration().hashCode() - rhs.getConfiguration().hashCode(); }; + static final Bytes REPLICATION_SCOPE_BYTES = new Bytes( + Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE)); + + @InterfaceAudience.Private + static final Comparator<ColumnFamilyDescriptor> COMPARATOR_IGNORE_REPLICATION = ( + ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> { + int result = Bytes.compareTo(lcf.getName(), rcf.getName()); + if (result != 0) { + return result; + } + // ColumnFamilyDescriptor.getValues is a immutable map, so copy it and remove + // REPLICATION_SCOPE_BYTES + Map<Bytes, Bytes> lValues = new HashMap<>(); + lValues.putAll(lcf.getValues()); + lValues.remove(REPLICATION_SCOPE_BYTES); + Map<Bytes, Bytes> rValues = new HashMap<>(); + rValues.putAll(rcf.getValues()); + rValues.remove(REPLICATION_SCOPE_BYTES); + result = lValues.hashCode() - rValues.hashCode(); + if (result != 0) { + return result; + } + return lcf.getConfiguration().hashCode() - rcf.getConfiguration().hashCode(); + }; + /** * @return The storefile/hfile blocksize for this column family. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 80f9d16..e153381 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,10 +54,8 @@ import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; -import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -76,7 +74,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -3893,7 +3891,7 @@ public class HBaseAdmin implements Admin { protected ReplicationPeerConfig rpcCall() throws Exception { GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig( getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId)); - return ReplicationSerDeHelper.convert(response.getPeerConfig()); + return ReplicationPeerConfigUtil.convert(response.getPeerConfig()); } }); } @@ -3919,7 +3917,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig); } @@ -3931,7 +3929,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); updateReplicationPeerConfig(id, peerConfig); } @@ -3957,7 +3955,7 @@ public class HBaseAdmin implements Admin { .getPeerDescList(); List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size()); for (ReplicationProtos.ReplicationPeerDescription peer : peersList) { - result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer)); + result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer)); } return result; } @@ -4010,19 +4008,18 @@ public class HBaseAdmin implements Admin { @Override public List<TableCFs> listReplicatedTableCFs() throws IOException { List<TableCFs> replicatedTableCFs = new ArrayList<>(); - HTableDescriptor[] tables = listTables(); - for (HTableDescriptor table : tables) { - HColumnDescriptor[] columns = table.getColumnFamilies(); + List<TableDescriptor> tables = listTableDescriptors(); + tables.forEach(table -> { Map<String, Integer> cfs = new HashMap<>(); - for (HColumnDescriptor column : columns) { - if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { - cfs.put(column.getNameAsString(), column.getScope()); - } - } + Stream.of(table.getColumnFamilies()) + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); if (!cfs.isEmpty()) { replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); } - } + }); return replicatedTableCFs; } @@ -4046,84 +4043,13 @@ public class HBaseAdmin implements Admin { throw new IllegalArgumentException("Table name is null"); } if (!tableExists(tableName)) { - throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() + throw new TableNotFoundException("Table '" + tableName.getNameAsString() + "' does not exists."); } setTableRep(tableName, false); } /** - * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method - * ensures that the name of table and column-families should match. - * @param peerHtd descriptor on peer cluster - * @param localHtd - The HTableDescriptor of table from source cluster. - * @return true If the name of table and column families match and REPLICATION_SCOPE copied - * successfully. false If there is any mismatch in the names. - */ - private boolean copyReplicationScope(final HTableDescriptor peerHtd, - final HTableDescriptor localHtd) { - // Copy the REPLICATION_SCOPE only when table names and the names of - // Column-Families are same. - int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); - - if (result == 0) { - Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator(); - Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator(); - - while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { - HColumnDescriptor remoteHCD = remoteHCDIter.next(); - HColumnDescriptor localHCD = localHCDIter.next(); - - String remoteHCDName = remoteHCD.getNameAsString(); - String localHCDName = localHCD.getNameAsString(); - - if (remoteHCDName.equals(localHCDName)) { - remoteHCD.setScope(localHCD.getScope()); - } else { - result = -1; - break; - } - } - if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { - return false; - } - } - - return result == 0; - } - - /** - * Compare the contents of the descriptor with another one passed as a parameter for replication - * purpose. The REPLICATION_SCOPE field is ignored during comparison. - * @param peerHtd descriptor on peer cluster - * @param localHtd descriptor on source cluster which needs to be replicated. - * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). - * @see java.lang.Object#equals(java.lang.Object) - */ - private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { - if (peerHtd == localHtd) { - return true; - } - if (peerHtd == null) { - return false; - } - boolean result = false; - - // Create a copy of peer HTD as we need to change its replication - // scope to match with the local HTD. - HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); - - result = copyReplicationScope(peerHtdCopy, localHtd); - - // If copy was successful, compare the two tables now. - if (result) { - result = (peerHtdCopy.compareTo(localHtd) == 0); - } - - return result; - } - - /** * Connect to peer and check the table descriptor on peer: * <ol> * <li>Create the same table on peer when not exist.</li> @@ -4143,21 +4069,23 @@ public class HBaseAdmin implements Admin { } for (ReplicationPeerDescription peerDesc : peers) { - if (needToReplicate(tableName, peerDesc)) { - Configuration peerConf = getPeerClusterConfiguration(peerDesc); + if (peerDesc.getPeerConfig().needToReplicate(tableName)) { + Configuration peerConf = + ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc); try (Connection conn = ConnectionFactory.createConnection(peerConf); Admin repHBaseAdmin = conn.getAdmin()) { - HTableDescriptor localHtd = getTableDescriptor(tableName); - HTableDescriptor peerHtd = null; + TableDescriptor tableDesc = getDescriptor(tableName); + TableDescriptor peerTableDesc = null; if (!repHBaseAdmin.tableExists(tableName)) { - repHBaseAdmin.createTable(localHtd, splits); + repHBaseAdmin.createTable(tableDesc, splits); } else { - peerHtd = repHBaseAdmin.getTableDescriptor(tableName); - if (peerHtd == null) { + peerTableDesc = repHBaseAdmin.getDescriptor(tableName); + if (peerTableDesc == null) { throw new IllegalArgumentException("Failed to get table descriptor for table " + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); } - if (!compareForReplication(peerHtd, localHtd)) { + if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, + tableDesc) != 0) { throw new IllegalArgumentException("Table " + tableName.getNameAsString() + " exists in peer cluster " + peerDesc.getPeerId() + ", but the table descriptors are not same when compared with source cluster." @@ -4170,108 +4098,20 @@ public class HBaseAdmin implements Admin { } /** - * Decide whether the table need replicate to the peer cluster according to the peer config - * @param table name of the table - * @param peer config for the peer - * @return true if the table need replicate to the peer cluster - */ - private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Set<String> namespaces = peerConfig.getNamespaces(); - Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap(); - // If null means user has explicitly not configured any namespaces and table CFs - // so all the tables data are applicable for replication - if (namespaces == null && tableCFsMap == null) { - return true; - } - if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { - return true; - } - if (tableCFsMap != null && tableCFsMap.containsKey(table)) { - return true; - } - LOG.debug("Table " + table.getNameAsString() - + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" - + peerConfig.getClusterKey()); - return false; - } - - /** * Set the table's replication switch if the table's replication switch is already not set. * @param tableName name of the table * @param enableRep is replication switch enable or disable * @throws IOException if a remote or network exception occurs */ private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { - HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName)); - ReplicationState currentReplicationState = getTableReplicationState(htd); - if (enableRep && currentReplicationState != ReplicationState.ENABLED - || !enableRep && currentReplicationState != ReplicationState.DISABLED) { - for (HColumnDescriptor hcd : htd.getFamilies()) { - hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL - : HConstants.REPLICATION_SCOPE_LOCAL); - } - modifyTable(tableName, htd); + TableDescriptor tableDesc = getDescriptor(tableName); + if (!tableDesc.matchReplicationScope(enableRep)) { + int scope = + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; + modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build()); } } - /** - * This enum indicates the current state of the replication for a given table. - */ - private enum ReplicationState { - ENABLED, // all column families enabled - MIXED, // some column families enabled, some disabled - DISABLED // all column families disabled - } - - /** - * @param htd table descriptor details for the table to check - * @return ReplicationState the current state of the table. - */ - private ReplicationState getTableReplicationState(HTableDescriptor htd) { - boolean hasEnabled = false; - boolean hasDisabled = false; - - for (HColumnDescriptor hcd : htd.getFamilies()) { - if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL - && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { - hasDisabled = true; - } else { - hasEnabled = true; - } - } - - if (hasEnabled && hasDisabled) return ReplicationState.MIXED; - if (hasEnabled) return ReplicationState.ENABLED; - return ReplicationState.DISABLED; - } - - /** - * Returns the configuration needed to talk to the remote slave cluster. - * @param peer the description of replication peer - * @return the configuration for the peer cluster, null if it was unable to get the configuration - * @throws IOException - */ - private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) - throws IOException { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return compound; - } - - return otherConf; - } - @Override public void clearCompactionQueues(final ServerName sn, final Set<String> queues) throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d77cd15..bcf581b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -42,6 +42,7 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; @@ -64,7 +65,7 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.Scan.ReadType; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -188,6 +189,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; @@ -506,6 +509,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> modifyTable(TableDescriptor desc) { + return this.<ModifyTableRequest, ModifyTableResponse> procedureCall( + RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), + (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); + } + + @Override public CompletableFuture<Void> deleteTable(TableName tableName) { return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), @@ -1515,7 +1526,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), - (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call(); + (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); } @Override @@ -1541,7 +1552,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { CompletableFuture<Void> future = new CompletableFuture<Void>(); getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { if (!completeExceptionally(future, error)) { - ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); @@ -1560,21 +1571,23 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture<Void> future = new CompletableFuture<Void>(); - getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { - if (!completeExceptionally(future, error)) { - try { - ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); - } catch (ReplicationException e) { - future.completeExceptionally(e); - return; - } - updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { - if (!completeExceptionally(future, error)) { - future.complete(result); + getReplicationPeerConfig(id).whenComplete( + (peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + try { + ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, + id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; } - }); - } - }); + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); return future; } @@ -1602,7 +1615,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { request, (s, c, req, done) -> s.listReplicationPeers(c, req, done), (resp) -> resp.getPeerDescList().stream() - .map(ReplicationSerDeHelper::toReplicationPeerDescription) + .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) .collect(Collectors.toList()))).call(); } @@ -2168,9 +2181,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { returnedFuture.completeExceptionally(err); return; } - LOG.info("location is " + location); if (!location.isPresent() || location.get().getRegion() == null) { - LOG.info("unknown location is " + location); returnedFuture.completeExceptionally(new UnknownRegionException( "Invalid region name or encoded region name: " + Bytes.toStringBinary(regionNameOrEncodedRegionName))); @@ -2323,6 +2334,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { + + ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "ENABLE"; + } + } + private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { DeleteTableProcedureBiConsumer(TableName tableName) { @@ -3031,4 +3054,254 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt); } + + @Override + public CompletableFuture<Void> enableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + getTableSplits(tableName).whenComplete((splits, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + } else { + checkAndSyncTableToPeerClusters(tableName, splits).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + setTableReplication(tableName, true).whenComplete((result3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result3); + } + }); + } + }); + } + }); + }); + return future; + } + + @Override + public CompletableFuture<Void> disableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + setTableReplication(tableName, false).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { + CompletableFuture<byte[][]> future = new CompletableFuture<>(); + getTableRegions(tableName).whenComplete((regions, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (regions.size() == 1) { + future.complete(null); + } else { + byte[][] splits = new byte[regions.size() - 1][]; + for (int i = 1; i < regions.size(); i++) { + splits[i - 1] = regions.get(i).getStartKey(); + } + future.complete(splits); + } + }); + return future; + } + + /** + * Connect to peer and check the table descriptor on peer: + * <ol> + * <li>Create the same table on peer when not exist.</li> + * <li>Throw an exception if the table already has replication enabled on any of the column + * families.</li> + * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> + * </ol> + * @param tableName name of the table to sync to the peer + * @param splits table split keys + */ + private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, + byte[][] splits) { + CompletableFuture<Void> future = new CompletableFuture<>(); + listReplicationPeers().whenComplete( + (peers, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peers == null || peers.size() <= 0) { + future.completeExceptionally(new IllegalArgumentException( + "Found no peer cluster for replication.")); + return; + } + List<CompletableFuture<Void>> futures = new ArrayList<>(); + peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) + .forEach(peer -> { + futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) + .whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, + ReplicationPeerDescription peer) { + Configuration peerConf = null; + try { + peerConf = + ReplicationPeerConfigUtil + .getPeerClusterConfiguration(connection.getConfiguration(), peer); + } catch (IOException e) { + return failedFuture(e); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + ConnectionFactory.createAsyncConnection(peerConf).whenComplete( + (conn, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + getTableDescriptor(tableName).whenComplete( + (tableDesc, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + return; + } + AsyncAdmin peerAdmin = conn.getAdmin(); + peerAdmin.tableExists(tableName).whenComplete( + (exist, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (!exist) { + CompletableFuture<Void> createTableFuture = null; + if (splits == null) { + createTableFuture = peerAdmin.createTable(tableDesc); + } else { + createTableFuture = peerAdmin.createTable(tableDesc, splits); + } + createTableFuture.whenComplete( + (result, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result); + } + }); + } else { + compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin).whenComplete( + (result, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else { + future.complete(result); + } + }); + } + }); + }); + }); + return future; + } + + private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, + TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { + CompletableFuture<Void> future = new CompletableFuture<>(); + peerAdmin.getTableDescriptor(tableName).whenComplete( + (peerTableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peerTableDesc == null) { + future.completeExceptionally(new IllegalArgumentException( + "Failed to get table descriptor for table " + tableName.getNameAsString() + + " from peer cluster " + peer.getPeerId())); + return; + } + if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { + future.completeExceptionally(new IllegalArgumentException("Table " + + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + + ", but the table descriptors are not same when compared with source cluster." + + " Thus can not enable the table's replication switch.")); + return; + } + future.complete(null); + }); + return future; + } + + /** + * Set the table's replication switch if the table's replication switch is already not set. + * @param tableName name of the table + * @param enableRep is replication switch enable or disable + */ + private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getTableDescriptor(tableName).whenComplete( + (tableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!tableDesc.matchReplicationScope(enableRep)) { + int scope = + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; + TableDescriptor newTableDesc = + TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); + modifyTable(newTableDesc).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + } else { + future.complete(null); + } + }); + return future; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 4e2deed..f485c4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -24,10 +24,11 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Set; -import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.yetus.audience.InterfaceAudience; /** * TableDescriptor contains the details about an HBase table such as the descriptors of @@ -39,8 +40,15 @@ import org.apache.hadoop.hbase.util.Bytes; public interface TableDescriptor { @InterfaceAudience.Private - static final Comparator<TableDescriptor> COMPARATOR - = (TableDescriptor lhs, TableDescriptor rhs) -> { + Comparator<TableDescriptor> COMPARATOR = getComparator(ColumnFamilyDescriptor.COMPARATOR); + + @InterfaceAudience.Private + Comparator<TableDescriptor> COMPARATOR_IGNORE_REPLICATION = + getComparator(ColumnFamilyDescriptor.COMPARATOR_IGNORE_REPLICATION); + + static Comparator<TableDescriptor> + getComparator(Comparator<ColumnFamilyDescriptor> cfComparator) { + return (TableDescriptor lhs, TableDescriptor rhs) -> { int result = lhs.getTableName().compareTo(rhs.getTableName()); if (result != 0) { return result; @@ -52,16 +60,17 @@ public interface TableDescriptor { return result; } - for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(), - it2 = rhsFamilies.iterator(); it.hasNext();) { - result = ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()); + for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(), it2 = + rhsFamilies.iterator(); it.hasNext();) { + result = cfComparator.compare(it.next(), it2.next()); if (result != 0) { return result; } } // punt on comparison for ordering, just calculate difference return Integer.compare(lhs.getValues().hashCode(), rhs.getValues().hashCode()); - }; + }; + } /** * Returns the count of the column families of the table. @@ -266,4 +275,30 @@ public interface TableDescriptor { */ boolean isReadOnly(); + /** + * Check if the table's cfs' replication scope matched with the replication state + * @param enabled replication state + * @return true if matched, otherwise false + */ + default boolean matchReplicationScope(boolean enabled) { + boolean hasEnabled = false; + boolean hasDisabled = false; + + for (ColumnFamilyDescriptor cf : getColumnFamilies()) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL + && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + hasDisabled = true; + } else { + hasEnabled = true; + } + } + + if (hasEnabled && hasDisabled) { + return false; + } + if (hasEnabled) { + return enabled; + } + return !enabled; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 7bde1c1..ef59311 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -33,18 +33,19 @@ import java.util.TreeSet; import java.util.function.Function; import java.util.regex.Matcher; import java.util.stream.Stream; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * @since 2.0.0 @@ -409,6 +410,24 @@ public class TableDescriptorBuilder { return this; } + /** + * Sets replication scope all & only the columns already in the builder. Columns added later won't + * be backfilled with replication scope. + * @param scope replication scope + * @return a TableDescriptorBuilder + */ + public TableDescriptorBuilder setReplicationScope(int scope) { + Map<byte[], ColumnFamilyDescriptor> newFamilies = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR); + newFamilies.putAll(desc.families); + newFamilies + .forEach((cf, cfDesc) -> { + desc.removeColumnFamily(cf); + desc.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfDesc).setScope(scope) + .build()); + }); + return this; + } + public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 39f2045..5a5913c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -141,7 +141,7 @@ public class ReplicationAdmin implements Closeable { * */ @Deprecated public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { - return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); + return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig); } /** @@ -228,7 +228,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public String getPeerTableCFs(String id) throws IOException { ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id); - return ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()); + return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap()); } /** @@ -243,7 +243,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException, IOException { - appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); + appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)); } /** @@ -300,7 +300,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void removePeerTableCFs(String id, String tableCf) throws ReplicationException, IOException { - removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); + removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java new file mode 100644 index 0000000..be468ae --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -0,0 +1,468 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.replication; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Set; + +/** + * Helper for TableCFs Operations. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public final class ReplicationPeerConfigUtil { + + private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUtil.class); + + private ReplicationPeerConfigUtil() {} + + public static String convertToString(Set<String> namespaces) { + if (namespaces == null) { + return null; + } + return StringUtils.join(namespaces, ';'); + } + + /** convert map to TableCFs Object */ + public static ReplicationProtos.TableCF[] convert( + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return null; + } + List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); + ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + tableCFBuilder.clear(); + tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); + Collection<String> v = entry.getValue(); + if (v != null && !v.isEmpty()) { + for (String value : entry.getValue()) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); + } + + public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return null; + } + return convert(convert(tableCfs)); + } + + /** + * Convert string to TableCFs Object. + * This is only for read TableCFs information from TableCF node. + * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. + * */ + public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { + if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { + return null; + } + + ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); + String[] tables = tableCFsConfig.split(";"); + List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length); + + for (String tab : tables) { + // 1 ignore empty table config + tab = tab.trim(); + if (tab.length() == 0) { + continue; + } + // 2 split to "table" and "cf1,cf2" + // for each table: "table#cf1,cf2" or "table" + String[] pair = tab.split(":"); + String tabName = pair[0].trim(); + if (pair.length > 2 || tabName.length() == 0) { + LOG.info("incorrect format:" + tableCFsConfig); + continue; + } + + tableCFBuilder.clear(); + // split namespace from tableName + String ns = "default"; + String tName = tabName; + String[] dbs = tabName.split("\\."); + if (dbs != null && dbs.length == 2) { + ns = dbs[0]; + tName = dbs[1]; + } + tableCFBuilder.setTableName( + ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); + + // 3 parse "cf1,cf2" part to List<cf> + if (pair.length == 2) { + String[] cfsList = pair[1].split(","); + for (String cf : cfsList) { + String cfName = cf.trim(); + if (cfName.length() > 0) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); + } + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); + } + + /** + * Convert TableCFs Object to String. + * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 + * */ + public static String convert(ReplicationProtos.TableCF[] tableCFs) { + StringBuilder sb = new StringBuilder(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ReplicationProtos.TableCF tableCF = tableCFs[i]; + String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); + if (!Strings.isEmpty(namespace)) { + sb.append(namespace).append("."). + append(tableCF.getTableName().getQualifier().toStringUtf8()) + .append(":"); + } else { + sb.append(tableCF.getTableName().toString()).append(":"); + } + for (int j = 0; j < tableCF.getFamiliesCount(); j++) { + sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); + } + sb.deleteCharAt(sb.length() - 1).append(";"); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + /** + * Get TableCF in TableCFs, if not exist, return null. + * */ + public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, + String table) { + for (int i = 0, n = tableCFs.length; i < n; i++) { + ReplicationProtos.TableCF tableCF = tableCFs[i]; + if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { + return tableCF; + } + } + return null; + } + + /** + * Parse bytes into TableCFs. + * It is used for backward compatibility. + * Old format bytes have no PB_MAGIC Header + * */ + public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); + } + + /** + * Convert tableCFs string into Map. + * */ + public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { + ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); + return convert2Map(tableCFs); + } + + /** + * Convert tableCFs Object to Map. + * */ + public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { + if (tableCFs == null || tableCFs.length == 0) { + return null; + } + Map<TableName, List<String>> tableCFsMap = new HashMap<>(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ReplicationProtos.TableCF tableCF = tableCFs[i]; + List<String> families = new ArrayList<>(); + for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { + families.add(tableCF.getFamilies(j).toStringUtf8()); + } + if (families.size() > 0) { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); + } else { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); + } + } + + return tableCFsMap; + } + + /** + * @param bytes Content of a peer znode. + * @return ClusterKey parsed from the passed bytes. + * @throws DeserializationException + */ + public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) + throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ReplicationProtos.ReplicationPeer.Builder builder = + ReplicationProtos.ReplicationPeer.newBuilder(); + ReplicationProtos.ReplicationPeer peer; + try { + ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + peer = builder.build(); + } catch (IOException e) { + throw new DeserializationException(e); + } + return convert(peer); + } else { + if (bytes.length > 0) { + return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); + } + return new ReplicationPeerConfig().setClusterKey(""); + } + } + + public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + if (peer.hasClusterkey()) { + peerConfig.setClusterKey(peer.getClusterkey()); + } + if (peer.hasReplicationEndpointImpl()) { + peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); + } + + for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { + peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); + } + + for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { + peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); + } + + Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map( + peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); + if (tableCFsMap != null) { + peerConfig.setTableCFsMap(tableCFsMap); + } + List<ByteString> namespacesList = peer.getNamespacesList(); + if (namespacesList != null && namespacesList.size() != 0) { + Set<String> namespaces = new HashSet<>(); + for (ByteString namespace : namespacesList) { + namespaces.add(namespace.toStringUtf8()); + } + peerConfig.setNamespaces(namespaces); + } + if (peer.hasBandwidth()) { + peerConfig.setBandwidth(peer.getBandwidth()); + } + return peerConfig; + } + + public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { + ReplicationProtos.ReplicationPeer.Builder builder = + ReplicationProtos.ReplicationPeer.newBuilder(); + if (peerConfig.getClusterKey() != null) { + builder.setClusterkey(peerConfig.getClusterKey()); + } + if (peerConfig.getReplicationEndpointImpl() != null) { + builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + } + + for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { + builder.addData(HBaseProtos.BytesBytesPair.newBuilder() + .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) + .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())) + .build()); + } + + for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { + builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() + .setName(entry.getKey()) + .setValue(entry.getValue()) + .build()); + } + + ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); + if (tableCFs != null) { + for (int i = 0; i < tableCFs.length; i++) { + builder.addTableCfs(tableCFs[i]); + } + } + Set<String> namespaces = peerConfig.getNamespaces(); + if (namespaces != null) { + for (String namespace : namespaces) { + builder.addNamespaces(ByteString.copyFromUtf8(namespace)); + } + } + + builder.setBandwidth(peerConfig.getBandwidth()); + return builder.build(); + } + + /** + * @param peerConfig + * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable + * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under + * /hbase/replication/peers/PEER_ID + */ + public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { + byte[] bytes = convert(peerConfig).toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + public static ReplicationPeerDescription toReplicationPeerDescription( + ReplicationProtos.ReplicationPeerDescription desc) { + boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() + .getState(); + ReplicationPeerConfig config = convert(desc.getConfig()); + return new ReplicationPeerDescription(desc.getId(), enabled, config); + } + + public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( + ReplicationPeerDescription desc) { + ReplicationProtos.ReplicationPeerDescription.Builder builder = + ReplicationProtos.ReplicationPeerDescription.newBuilder(); + builder.setId(desc.getPeerId()); + ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState + .newBuilder(); + stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED + : ReplicationProtos.ReplicationState.State.DISABLED); + builder.setState(stateBuilder.build()); + builder.setConfig(convert(desc.getPeerConfig())); + return builder.build(); + } + + public static void appendTableCFsToReplicationPeerConfig( + Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) { + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + peerConfig.setTableCFsMap(tableCfs); + } else { + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection<String> appendCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.addAll(appendCfs); + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + preTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + } + } + + public static void removeTableCFsFromReplicationPeerConfig( + Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig, + String id) throws ReplicationException { + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); + } + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection<String> removeCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + preTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + preTableCfs.remove(table); + } else { + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException("No table: " + + table + " in table-cfs config of peer: " + id); + } + } + } + + /** + * Returns the configuration needed to talk to the remote slave cluster. + * @param conf the base configuration + * @param peer the description of replication peer + * @return the configuration for the peer cluster, null if it was unable to get the configuration + * @throws IOException when create peer cluster configuration failed + */ + public static Configuration getPeerClusterConfiguration(Configuration conf, + ReplicationPeerDescription peer) throws IOException { + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); + } catch (IOException e) { + throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return compound; + } + + return otherConf; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java deleted file mode 100644 index 986a09f..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ /dev/null @@ -1,437 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client.replication; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Strings; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.Set; - -/** - * Helper for TableCFs Operations. - */ -@InterfaceAudience.Private -@InterfaceStability.Stable -public final class ReplicationSerDeHelper { - - private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class); - - private ReplicationSerDeHelper() {} - - public static String convertToString(Set<String> namespaces) { - if (namespaces == null) { - return null; - } - return StringUtils.join(namespaces, ';'); - } - - /** convert map to TableCFs Object */ - public static ReplicationProtos.TableCF[] convert( - Map<TableName, ? extends Collection<String>> tableCfs) { - if (tableCfs == null) { - return null; - } - List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); - ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - tableCFBuilder.clear(); - tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); - Collection<String> v = entry.getValue(); - if (v != null && !v.isEmpty()) { - for (String value : entry.getValue()) { - tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); - } - } - tableCFList.add(tableCFBuilder.build()); - } - return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); - } - - public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { - if (tableCfs == null) { - return null; - } - return convert(convert(tableCfs)); - } - - /** - * Convert string to TableCFs Object. - * This is only for read TableCFs information from TableCF node. - * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. - * */ - public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { - if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { - return null; - } - - ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); - String[] tables = tableCFsConfig.split(";"); - List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length); - - for (String tab : tables) { - // 1 ignore empty table config - tab = tab.trim(); - if (tab.length() == 0) { - continue; - } - // 2 split to "table" and "cf1,cf2" - // for each table: "table#cf1,cf2" or "table" - String[] pair = tab.split(":"); - String tabName = pair[0].trim(); - if (pair.length > 2 || tabName.length() == 0) { - LOG.info("incorrect format:" + tableCFsConfig); - continue; - } - - tableCFBuilder.clear(); - // split namespace from tableName - String ns = "default"; - String tName = tabName; - String[] dbs = tabName.split("\\."); - if (dbs != null && dbs.length == 2) { - ns = dbs[0]; - tName = dbs[1]; - } - tableCFBuilder.setTableName( - ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); - - // 3 parse "cf1,cf2" part to List<cf> - if (pair.length == 2) { - String[] cfsList = pair[1].split(","); - for (String cf : cfsList) { - String cfName = cf.trim(); - if (cfName.length() > 0) { - tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); - } - } - } - tableCFList.add(tableCFBuilder.build()); - } - return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); - } - - /** - * Convert TableCFs Object to String. - * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 - * */ - public static String convert(ReplicationProtos.TableCF[] tableCFs) { - StringBuilder sb = new StringBuilder(); - for (int i = 0, n = tableCFs.length; i < n; i++) { - ReplicationProtos.TableCF tableCF = tableCFs[i]; - String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); - if (!Strings.isEmpty(namespace)) { - sb.append(namespace).append("."). - append(tableCF.getTableName().getQualifier().toStringUtf8()) - .append(":"); - } else { - sb.append(tableCF.getTableName().toString()).append(":"); - } - for (int j = 0; j < tableCF.getFamiliesCount(); j++) { - sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); - } - sb.deleteCharAt(sb.length() - 1).append(";"); - } - if (sb.length() > 0) { - sb.deleteCharAt(sb.length() - 1); - } - return sb.toString(); - } - - /** - * Get TableCF in TableCFs, if not exist, return null. - * */ - public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, - String table) { - for (int i = 0, n = tableCFs.length; i < n; i++) { - ReplicationProtos.TableCF tableCF = tableCFs[i]; - if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { - return tableCF; - } - } - return null; - } - - /** - * Parse bytes into TableCFs. - * It is used for backward compatibility. - * Old format bytes have no PB_MAGIC Header - * */ - public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { - if (bytes == null) { - return null; - } - return ReplicationSerDeHelper.convert(Bytes.toString(bytes)); - } - - /** - * Convert tableCFs string into Map. - * */ - public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { - ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); - return convert2Map(tableCFs); - } - - /** - * Convert tableCFs Object to Map. - * */ - public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { - if (tableCFs == null || tableCFs.length == 0) { - return null; - } - Map<TableName, List<String>> tableCFsMap = new HashMap<>(); - for (int i = 0, n = tableCFs.length; i < n; i++) { - ReplicationProtos.TableCF tableCF = tableCFs[i]; - List<String> families = new ArrayList<>(); - for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { - families.add(tableCF.getFamilies(j).toStringUtf8()); - } - if (families.size() > 0) { - tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); - } else { - tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); - } - } - - return tableCFsMap; - } - - /** - * @param bytes Content of a peer znode. - * @return ClusterKey parsed from the passed bytes. - * @throws DeserializationException - */ - public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) - throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ReplicationProtos.ReplicationPeer.Builder builder = - ReplicationProtos.ReplicationPeer.newBuilder(); - ReplicationProtos.ReplicationPeer peer; - try { - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); - peer = builder.build(); - } catch (IOException e) { - throw new DeserializationException(e); - } - return convert(peer); - } else { - if (bytes.length > 0) { - return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); - } - return new ReplicationPeerConfig().setClusterKey(""); - } - } - - public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { - ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); - if (peer.hasClusterkey()) { - peerConfig.setClusterKey(peer.getClusterkey()); - } - if (peer.hasReplicationEndpointImpl()) { - peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); - } - - for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { - peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); - } - - for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { - peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); - } - - Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map( - peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); - if (tableCFsMap != null) { - peerConfig.setTableCFsMap(tableCFsMap); - } - List<ByteString> namespacesList = peer.getNamespacesList(); - if (namespacesList != null && namespacesList.size() != 0) { - Set<String> namespaces = new HashSet<>(); - for (ByteString namespace : namespacesList) { - namespaces.add(namespace.toStringUtf8()); - } - peerConfig.setNamespaces(namespaces); - } - if (peer.hasBandwidth()) { - peerConfig.setBandwidth(peer.getBandwidth()); - } - return peerConfig; - } - - public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { - ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); - if (peerConfig.getClusterKey() != null) { - builder.setClusterkey(peerConfig.getClusterKey()); - } - if (peerConfig.getReplicationEndpointImpl() != null) { - builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - } - - for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { - builder.addData(HBaseProtos.BytesBytesPair.newBuilder() - .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) - .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())) - .build()); - } - - for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { - builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() - .setName(entry.getKey()) - .setValue(entry.getValue()) - .build()); - } - - ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); - if (tableCFs != null) { - for (int i = 0; i < tableCFs.length; i++) { - builder.addTableCfs(tableCFs[i]); - } - } - Set<String> namespaces = peerConfig.getNamespaces(); - if (namespaces != null) { - for (String namespace : namespaces) { - builder.addNamespaces(ByteString.copyFromUtf8(namespace)); - } - } - - builder.setBandwidth(peerConfig.getBandwidth()); - return builder.build(); - } - - /** - * @param peerConfig - * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable - * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under - * /hbase/replication/peers/PEER_ID - */ - public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { - byte[] bytes = convert(peerConfig).toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - - public static ReplicationPeerDescription toReplicationPeerDescription( - ReplicationProtos.ReplicationPeerDescription desc) { - boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() - .getState(); - ReplicationPeerConfig config = convert(desc.getConfig()); - return new ReplicationPeerDescription(desc.getId(), enabled, config); - } - - public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( - ReplicationPeerDescription desc) { - ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription - .newBuilder(); - builder.setId(desc.getPeerId()); - ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState - .newBuilder(); - stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED - : ReplicationProtos.ReplicationState.State.DISABLED); - builder.setState(stateBuilder.build()); - builder.setConfig(convert(desc.getPeerConfig())); - return builder.build(); - } - - public static void appendTableCFsToReplicationPeerConfig( - Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) { - Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - peerConfig.setTableCFsMap(tableCfs); - } else { - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection<String> appendCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List<String> cfs = preTableCfs.get(table); - if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - Set<String> cfSet = new HashSet<String>(cfs); - cfSet.addAll(appendCfs); - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else { - if (appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - preTableCfs.put(table, Lists.newArrayList(appendCfs)); - } - } - } - } - } - - public static void removeTableCFsFromReplicationPeerConfig( - Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig, - String id) throws ReplicationException { - Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); - } - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection<String> removeCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List<String> cfs = preTableCfs.get(table); - if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { - preTableCfs.remove(table); - } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { - Set<String> cfSet = new HashSet<String>(cfs); - cfSet.removeAll(removeCfs); - if (cfSet.isEmpty()) { - preTableCfs.remove(table); - } else { - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove cf of table: " + table - + " which doesn't specify cfs from table-cfs config in peer: " + id); - } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove table: " + table - + " which has specified cfs from table-cfs config in peer: " + id); - } - } else { - throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bdd6e74..4d429c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -123,4 +123,24 @@ public class ReplicationPeerConfig { builder.append("bandwidth=").append(bandwidth); return builder.toString(); } + + /** + * Decide whether the table need replicate to the peer cluster + * @param table name of the table + * @return true if the table need replicate to the peer cluster + */ + public boolean needToReplicate(TableName table) { + // If null means user has explicitly not configured any namespaces and table CFs + // so all the tables data are applicable for replication + if (namespaces == null && tableCFsMap == null) { + return true; + } + if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { + return true; + } + if (tableCFsMap != null && tableCFsMap.containsKey(table)) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4558deb..9eff114 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -1621,7 +1621,7 @@ public final class RequestConverter { String peerId, ReplicationPeerConfig peerConfig) { AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder(); builder.setPeerId(peerId); - builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig)); + builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); return builder.build(); } @@ -1658,7 +1658,7 @@ public final class RequestConverter { UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest .newBuilder(); builder.setPeerId(peerId); - builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig)); + builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); return builder.build(); }