HBASE-16447 Replication by namespaces config in peer (Guanghao Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1a1003a4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1a1003a4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1a1003a4 Branch: refs/heads/hbase-14439 Commit: 1a1003a482d9bfb725fbe1097c794fdb043dcd81 Parents: 2cf8907 Author: Enis Soztutar <e...@apache.org> Authored: Fri Sep 16 11:47:42 2016 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri Sep 16 11:47:42 2016 -0700 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 38 ++- .../replication/ReplicationSerDeHelper.java | 30 ++- .../hbase/replication/ReplicationPeer.java | 7 + .../replication/ReplicationPeerConfig.java | 15 +- .../replication/ReplicationPeerZKImpl.java | 10 + .../replication/ReplicationPeersZKImpl.java | 4 +- .../hbase/zookeeper/ZooKeeperWatcher.java | 2 +- .../ipc/protobuf/generated/TestProtos.java | 10 +- .../protobuf/generated/ZooKeeperProtos.java | 186 ++++++++++++-- .../src/main/protobuf/ZooKeeper.proto | 1 + .../apache/hadoop/hbase/ZKNamespaceManager.java | 2 +- .../replication/BaseReplicationEndpoint.java | 6 +- .../NamespaceTableCfWALEntryFilter.java | 126 ++++++++++ .../replication/TableCfWALEntryFilter.java | 101 -------- .../replication/TestReplicationAdmin.java | 84 +++++++ .../replication/TestNamespaceReplication.java | 248 +++++++++++++++++++ .../TestReplicationWALEntryFilters.java | 73 +++++- ...egionReplicaReplicationEndpointNoMaster.java | 3 + .../src/main/ruby/hbase/replication_admin.rb | 36 +++ hbase-shell/src/main/ruby/hbase_constants.rb | 1 + hbase-shell/src/main/ruby/shell.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 16 +- .../src/main/ruby/shell/commands/list_peers.rb | 7 +- .../ruby/shell/commands/set_peer_namespaces.rb | 51 ++++ .../ruby/shell/commands/set_peer_tableCFs.rb | 10 +- .../test/ruby/hbase/replication_admin_test.rb | 69 +++++- 26 files changed, 993 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index de6cb7f..dc1a7ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -189,6 +189,8 @@ public class ReplicationAdmin implements Closeable { * @param peerConfig configuration for the replication slave cluster */ public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); this.replicationPeers.registerPeer(id, peerConfig); } @@ -202,8 +204,11 @@ public class ReplicationAdmin implements Closeable { public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); this.replicationPeers.updatePeerConfig(id, peerConfig); } + /** * Removes a peer cluster and stops the replication to it. * @param id a short name that identifies the cluster @@ -360,7 +365,6 @@ public class ReplicationAdmin implements Closeable { } } else { throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); - } } setPeerTableCFs(id, preTableCfs); @@ -376,6 +380,8 @@ public class ReplicationAdmin implements Closeable { */ public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { + checkNamespacesAndTableCfsConfigConflict( + this.replicationPeers.getReplicationPeerConfig(id).getNamespaces(), tableCfs); this.replicationPeers.setPeerTableCFsConfig(id, tableCfs); } @@ -627,4 +633,34 @@ public class ReplicationAdmin implements Closeable { } return true; } + + /** + * Set a namespace in the peer config means that all tables in this namespace + * will be replicated to the peer cluster. + * + * 1. If you already have set a namespace in the peer config, then you can't set any table + * of this namespace to the peer config. + * 2. If you already have set a table in the peer config, then you can't set this table's + * namespace to the peer config. + * + * @param namespaces + * @param tableCfs + * @throws ReplicationException + */ + private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, + Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { + if (namespaces == null || namespaces.isEmpty()) { + return; + } + if (tableCfs == null || tableCfs.isEmpty()) { + return; + } + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + if (namespaces.contains(table.getNamespaceAsString())) { + throw new ReplicationException( + "Table-cfs config conflict with namespaces config in peer"); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 9682f89..225e685 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client.replication; import com.google.protobuf.ByteString; + +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; @@ -34,10 +36,12 @@ import org.apache.hadoop.hbase.util.Strings; import java.io.IOException; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.ArrayList; +import java.util.Set; /** * Helper for TableCFs Operations. @@ -50,6 +54,13 @@ public final class ReplicationSerDeHelper { private ReplicationSerDeHelper() {} + public static String convertToString(Set<String> namespaces) { + if (namespaces == null) { + return null; + } + return StringUtils.join(namespaces, ';'); + } + /** convert map to TableCFs Object */ public static ZooKeeperProtos.TableCF[] convert( Map<TableName, ? extends Collection<String>> tableCfs) { @@ -262,11 +273,21 @@ public final class ReplicationSerDeHelper { for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); } + Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map( peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()])); if (tableCFsMap != null) { peerConfig.setTableCFsMap(tableCFsMap); } + + List<ByteString> namespacesList = peer.getNamespacesList(); + if (namespacesList != null && namespacesList.size() != 0) { + Set<String> namespaces = new HashSet<String>(); + for (ByteString namespace : namespacesList) { + namespaces.add(namespace.toStringUtf8()); + } + peerConfig.setNamespaces(namespaces); + } return peerConfig; } @@ -292,12 +313,20 @@ public final class ReplicationSerDeHelper { .setValue(entry.getValue()) .build()); } + ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); if (tableCFs != null) { for (int i = 0; i < tableCFs.length; i++) { builder.addTableCfs(tableCFs[i]); } } + Set<String> namespaces = peerConfig.getNamespaces(); + if (namespaces != null) { + for (String namespace : namespaces) { + builder.addNamespaces(ByteString.copyFromUtf8(namespace)); + } + } + return builder.build(); } @@ -311,5 +340,4 @@ public final class ReplicationSerDeHelper { byte[] bytes = convert(peerConfig).toByteArray(); return ProtobufUtil.prependPBMagic(bytes); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 3da01fe..bd2b700 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -71,6 +72,12 @@ public interface ReplicationPeer { */ public Map<TableName, List<String>> getTableCFs(); + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + public Set<String> getNamespaces(); + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 1d2066c..1f0d085 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.hbase.TableName; @@ -42,7 +43,7 @@ public class ReplicationPeerConfig { private final Map<byte[], byte[]> peerData; private final Map<String, String> configuration; private Map<TableName, ? extends Collection<String>> tableCFsMap = null; - + private Set<String> namespaces = null; public ReplicationPeerConfig() { this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR); @@ -93,10 +94,22 @@ public class ReplicationPeerConfig { return this; } + public Set<String> getNamespaces() { + return this.namespaces; + } + + public ReplicationPeerConfig setNamespaces(Set<String> namespaces) { + this.namespaces = namespaces; + return this; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(","); + if (namespaces != null) { + builder.append("namespaces=").append(namespaces.toString()).append(","); + } if (tableCFsMap != null) { builder.append("tableCFs=").append(tableCFsMap.toString()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index a33690c..cfe543a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -163,6 +164,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase return this.tableCFs; } + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + @Override + public Set<String> getNamespaces() { + return this.peerConfig.getNamespaces(); + } + @Override public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { if (this.peerConfigTracker != null){ http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 54c2dac..90b1347 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -343,7 +343,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throws ReplicationException { ReplicationPeer peer = getConnectedPeer(id); if (peer == null){ - throw new ReplicationException("Could not find peer Id " + id); + throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); } ReplicationPeerConfig existingConfig = peer.getPeerConfig(); if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && @@ -366,6 +366,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // or data that weren't explicitly changed existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); existingConfig.getPeerData().putAll(newConfig.getPeerData()); + existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); + existingConfig.setNamespaces(newConfig.getNamespaces()); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 5ef7171..f7d7e26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -122,7 +122,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // znode containing the state of recovering regions public String recoveringRegionsZNode; // znode containing namespace descriptors - public static String namespaceZNode = "namespace"; + public String namespaceZNode = "namespace"; // znode of indicating master maintenance mode public static String masterMaintZNode = "masterMaintenance"; http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java index d28945c..58e248e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java @@ -2091,7 +2091,7 @@ public final class TestProtos { public final boolean isInitialized() { if (!hasMs()) { - + return false; } return true; @@ -2291,7 +2291,7 @@ public final class TestProtos { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2307,7 +2307,7 @@ public final class TestProtos { getAddrBytes() { java.lang.Object ref = addr_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); addr_ = b; @@ -2567,7 +2567,7 @@ public final class TestProtos { public final boolean isInitialized() { if (!hasAddr()) { - + return false; } return true; @@ -2621,7 +2621,7 @@ public final class TestProtos { getAddrBytes() { java.lang.Object ref = addr_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); addr_ = b; http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index f64d0c1..d7de638 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -4782,6 +4782,20 @@ public final class ZooKeeperProtos { */ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder( int index); + + // repeated bytes namespaces = 6; + /** + * <code>repeated bytes namespaces = 6;</code> + */ + java.util.List<com.google.protobuf.ByteString> getNamespacesList(); + /** + * <code>repeated bytes namespaces = 6;</code> + */ + int getNamespacesCount(); + /** + * <code>repeated bytes namespaces = 6;</code> + */ + com.google.protobuf.ByteString getNamespaces(int index); } /** * Protobuf type {@code hbase.pb.ReplicationPeer} @@ -4873,6 +4887,14 @@ public final class ZooKeeperProtos { tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry)); break; } + case 50: { + if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>(); + mutable_bitField0_ |= 0x00000020; + } + namespaces_.add(input.readBytes()); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4890,6 +4912,9 @@ public final class ZooKeeperProtos { if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) { tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_); } + if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + namespaces_ = java.util.Collections.unmodifiableList(namespaces_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -5131,12 +5156,36 @@ public final class ZooKeeperProtos { return tableCfs_.get(index); } + // repeated bytes namespaces = 6; + public static final int NAMESPACES_FIELD_NUMBER = 6; + private java.util.List<com.google.protobuf.ByteString> namespaces_; + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public java.util.List<com.google.protobuf.ByteString> + getNamespacesList() { + return namespaces_; + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public int getNamespacesCount() { + return namespaces_.size(); + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public com.google.protobuf.ByteString getNamespaces(int index) { + return namespaces_.get(index); + } + private void initFields() { clusterkey_ = ""; replicationEndpointImpl_ = ""; data_ = java.util.Collections.emptyList(); configuration_ = java.util.Collections.emptyList(); tableCfs_ = java.util.Collections.emptyList(); + namespaces_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5187,6 +5236,9 @@ public final class ZooKeeperProtos { for (int i = 0; i < tableCfs_.size(); i++) { output.writeMessage(5, tableCfs_.get(i)); } + for (int i = 0; i < namespaces_.size(); i++) { + output.writeBytes(6, namespaces_.get(i)); + } getUnknownFields().writeTo(output); } @@ -5216,6 +5268,15 @@ public final class ZooKeeperProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, tableCfs_.get(i)); } + { + int dataSize = 0; + for (int i = 0; i < namespaces_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(namespaces_.get(i)); + } + size += dataSize; + size += 1 * getNamespacesList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5255,6 +5316,8 @@ public final class ZooKeeperProtos { .equals(other.getConfigurationList()); result = result && getTableCfsList() .equals(other.getTableCfsList()); + result = result && getNamespacesList() + .equals(other.getNamespacesList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5288,6 +5351,10 @@ public final class ZooKeeperProtos { hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER; hash = (53 * hash) + getTableCfsList().hashCode(); } + if (getNamespacesCount() > 0) { + hash = (37 * hash) + NAMESPACES_FIELD_NUMBER; + hash = (53 * hash) + getNamespacesList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -5427,6 +5494,8 @@ public final class ZooKeeperProtos { } else { tableCfsBuilder_.clear(); } + namespaces_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -5490,6 +5559,11 @@ public final class ZooKeeperProtos { } else { result.tableCfs_ = tableCfsBuilder_.build(); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + namespaces_ = java.util.Collections.unmodifiableList(namespaces_); + bitField0_ = (bitField0_ & ~0x00000020); + } + result.namespaces_ = namespaces_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5594,6 +5668,16 @@ public final class ZooKeeperProtos { } } } + if (!other.namespaces_.isEmpty()) { + if (namespaces_.isEmpty()) { + namespaces_ = other.namespaces_; + bitField0_ = (bitField0_ & ~0x00000020); + } else { + ensureNamespacesIsMutable(); + namespaces_.addAll(other.namespaces_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6541,6 +6625,78 @@ public final class ZooKeeperProtos { return tableCfsBuilder_; } + // repeated bytes namespaces = 6; + private java.util.List<com.google.protobuf.ByteString> namespaces_ = java.util.Collections.emptyList(); + private void ensureNamespacesIsMutable() { + if (!((bitField0_ & 0x00000020) == 0x00000020)) { + namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>(namespaces_); + bitField0_ |= 0x00000020; + } + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public java.util.List<com.google.protobuf.ByteString> + getNamespacesList() { + return java.util.Collections.unmodifiableList(namespaces_); + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public int getNamespacesCount() { + return namespaces_.size(); + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public com.google.protobuf.ByteString getNamespaces(int index) { + return namespaces_.get(index); + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public Builder setNamespaces( + int index, com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureNamespacesIsMutable(); + namespaces_.set(index, value); + onChanged(); + return this; + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public Builder addNamespaces(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureNamespacesIsMutable(); + namespaces_.add(value); + onChanged(); + return this; + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public Builder addAllNamespaces( + java.lang.Iterable<? extends com.google.protobuf.ByteString> values) { + ensureNamespacesIsMutable(); + super.addAll(values, namespaces_); + onChanged(); + return this; + } + /** + * <code>repeated bytes namespaces = 6;</code> + */ + public Builder clearNamespaces() { + namespaces_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000020); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer) } @@ -9822,24 +9978,24 @@ public final class ZooKeeperProtos { "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" + "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta", - "bleName\022\020\n\010families\030\002 \003(\014\"\305\001\n\017Replicatio" + + "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" + "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" + "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" + ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" + "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" + - "\030\005 \003(\0132\021.hbase.pb.TableCF\"g\n\020Replication" + - "State\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicat" + - "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" + - "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" + - "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo", - "ck_owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_na" + - "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" + - "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" + - "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" + - "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" + - "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" + - ".hbase.protobuf.generatedB\017ZooKeeperProt" + - "osH\001\210\001\001\240\001\001" + "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" + + "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" + + "2 .hbase.pb.ReplicationState.State\"\"\n\005St" + + "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" + + "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017R", + "eplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tT" + + "ableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb." + + "TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb" + + ".ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sha" + + "red\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_tim" + + "e\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010B" + + "E\n*org.apache.hadoop.hbase.protobuf.gene" + + "ratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9887,7 +10043,7 @@ public final class ZooKeeperProtos { internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ReplicationPeer_descriptor, - new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", }); + new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", }); internal_static_hbase_pb_ReplicationState_descriptor = getDescriptor().getMessageTypes().get(7); internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/protobuf/ZooKeeper.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto index 8713cbd..ea8f747 100644 --- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -121,6 +121,7 @@ message ReplicationPeer { repeated BytesBytesPair data = 3; repeated NameStringPair configuration = 4; repeated TableCF table_cfs = 5; + repeated bytes namespaces = 6; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java index ee59c01..7b53333 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java @@ -54,7 +54,7 @@ public class ZKNamespaceManager extends ZooKeeperListener { public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException { super(zkw); - nsZNode = ZooKeeperWatcher.namespaceZNode; + nsZNode = zkw.namespaceZNode; cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index d667269..48f3ac5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -72,7 +72,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService if (scopeFilter != null) { filters.add(scopeFilter); } - WALEntryFilter tableCfFilter = getTableCfWALEntryFilter(); + WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter(); if (tableCfFilter != null) { filters.add(tableCfFilter); } @@ -87,8 +87,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can * return null if they don't want this filter */ - protected WALEntryFilter getTableCfWALEntryFilter() { - return new TableCfWALEntryFilter(ctx.getReplicationPeer()); + protected WALEntryFilter getNamespaceTableCfWALEntryFilter() { + return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java new file mode 100644 index 0000000..2673cbd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +import com.google.common.base.Predicate; + +/** + * Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry + * by namespaces config, then filter entry by table-cfs config. + * + * 1. Set a namespace in peer config means that all tables in this namespace will be replicated. + * 2. If the namespaces config is null, then the table-cfs config decide which table's edit + * can be replicated. If the table-cfs config is null, then the namespaces config decide + * which table's edit can be replicated. + */ +@InterfaceAudience.Private +public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { + + private static final Log LOG = LogFactory.getLog(NamespaceTableCfWALEntryFilter.class); + private final ReplicationPeer peer; + private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); + + public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) { + this.peer = peer; + } + + @Override + public Entry filter(Entry entry) { + TableName tabName = entry.getKey().getTablename(); + String namespace = tabName.getNamespaceAsString(); + Set<String> namespaces = this.peer.getNamespaces(); + Map<TableName, List<String>> tableCFs = getTableCfs(); + + // If null means user has explicitly not configured any namespaces and table CFs + // so all the tables data are applicable for replication + if (namespaces == null && tableCFs == null) { + return entry; + } + + // First filter by namespaces config + // If table's namespace in peer config, all the tables data are applicable for replication + if (namespaces != null && namespaces.contains(namespace)) { + return entry; + } + + // Then filter by table-cfs config + // return null(prevent replicating) if logKey's table isn't in this peer's + // replicaable namespace list and table list + if (tableCFs == null || !tableCFs.containsKey(tabName)) { + return null; + } + + return entry; + } + + @Override + public Cell filterCell(final Entry entry, Cell cell) { + final Map<TableName, List<String>> tableCfs = getTableCfs(); + if (tableCfs == null) return cell; + TableName tabName = entry.getKey().getTablename(); + List<String> cfs = tableCfs.get(tabName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { + @Override + public boolean apply(byte[] fam) { + if (tableCfs != null) { + List<String> cfs = tableCfs.get(entry.getKey().getTablename()); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + return true; + } + } + return false; + } + }); + } else { + if ((cfs != null) && !cfs.contains( + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { + return null; + } + } + return cell; + } + + Map<TableName, List<String>> getTableCfs() { + Map<TableName, List<String>> tableCFs = null; + try { + tableCFs = this.peer.getTableCFs(); + } catch (IllegalArgumentException e) { + LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + + ", degenerate as if it's not configured by keeping tableCFs==null"); + } + return tableCFs; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java deleted file mode 100644 index d890e3e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL.Entry; - -import com.google.common.base.Predicate; - -public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { - - private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class); - private ReplicationPeer peer; - private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); - - public TableCfWALEntryFilter(ReplicationPeer peer) { - this.peer = peer; - } - - @Override - public Entry filter(Entry entry) { - TableName tabName = entry.getKey().getTablename(); - Map<TableName, List<String>> tableCFs = getTableCfs(); - - // If null means user has explicitly not configured any table CFs so all the tables data are - // applicable for replication - if (tableCFs == null) return entry; - - if (!tableCFs.containsKey(tabName)) { - return null; - } - - return entry; - } - - @Override - public Cell filterCell(final Entry entry, Cell cell) { - final Map<TableName, List<String>> tableCfs = getTableCfs(); - if (tableCfs == null) return cell; - TableName tabName = entry.getKey().getTablename(); - List<String> cfs = tableCfs.get(tabName); - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { - @Override - public boolean apply(byte[] fam) { - if (tableCfs != null) { - List<String> cfs = tableCfs.get(entry.getKey().getTablename()); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - return true; - } - } - return false; - } - }); - } else { - if ((cfs != null) && !cfs.contains( - Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { - return null; - } - } - return cell; - } - - Map<TableName, List<String>> getTableCfs() { - Map<TableName, List<String>> tableCFs = null; - try { - tableCFs = this.peer.getTableCFs(); - } catch (IllegalArgumentException e) { - LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + - ", degenerate as if it's not configured by keeping tableCFs==null"); - } - return tableCFs; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 85820af..c0d18dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.client.replication; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -385,4 +387,86 @@ public class TestReplicationAdmin { admin.removePeer(ID_ONE); } + + @Test + public void testSetPeerNamespaces() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addPeer(ID_ONE, rpc); + admin.peerAdded(ID_ONE); + + rpc = admin.getPeerConfig(ID_ONE); + Set<String> namespaces = new HashSet<String>(); + namespaces.add(ns1); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + admin.updatePeerConfig(ID_ONE, rpc); + namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); + assertEquals(2, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + assertTrue(namespaces.contains(ns2)); + + rpc = admin.getPeerConfig(ID_ONE); + namespaces.clear(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updatePeerConfig(ID_ONE, rpc); + namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); + assertEquals(1, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + + admin.removePeer(ID_ONE); + } + + @Test + public void testNamespacesAndTableCfsConfigConflict() throws ReplicationException { + String ns1 = "ns1"; + String ns2 = "ns2"; + TableName tab1 = TableName.valueOf("ns1:tabl"); + TableName tab2 = TableName.valueOf("ns2:tab2"); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addPeer(ID_ONE, rpc); + admin.peerAdded(ID_ONE); + + rpc = admin.getPeerConfig(ID_ONE); + Set<String> namespaces = new HashSet<String>(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updatePeerConfig(ID_ONE, rpc); + rpc = admin.getPeerConfig(ID_ONE); + Map<TableName, List<String>> tableCfs = new HashMap<>(); + tableCfs.put(tab1, new ArrayList<String>()); + rpc.setTableCFsMap(tableCfs); + try { + admin.updatePeerConfig(ID_ONE, rpc); + fail("Should throw ReplicationException, because table " + tab1 + " conflict with namespace " + + ns1); + } catch (ReplicationException e) { + // OK + } + + rpc = admin.getPeerConfig(ID_ONE); + tableCfs.clear(); + tableCfs.put(tab2, new ArrayList<String>()); + rpc.setTableCFsMap(tableCfs); + admin.updatePeerConfig(ID_ONE, rpc); + rpc = admin.getPeerConfig(ID_ONE); + namespaces.clear(); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + try { + admin.updatePeerConfig(ID_ONE, rpc); + fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table " + + tab2); + } catch (ReplicationException e) { + // OK + } + + admin.removePeer(ID_ONE); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java new file mode 100644 index 0000000..ee9b0cb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java @@ -0,0 +1,248 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class}) +public class TestNamespaceReplication extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestNamespaceReplication.class); + + private static String ns1 = "ns1"; + private static String ns2 = "ns2"; + + private static final TableName tabAName = TableName.valueOf("ns1:TA"); + private static final TableName tabBName = TableName.valueOf("ns2:TB"); + + private static final byte[] f1Name = Bytes.toBytes("f1"); + private static final byte[] f2Name = Bytes.toBytes("f2"); + + private static final byte[] val = Bytes.toBytes("myval"); + + private static HTableDescriptor tabA; + private static HTableDescriptor tabB; + + private static Connection connection1; + private static Connection connection2; + private static Admin admin1; + private static Admin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + + connection1 = ConnectionFactory.createConnection(conf1); + connection2 = ConnectionFactory.createConnection(conf2); + admin1 = connection1.getAdmin(); + admin2 = connection2.getAdmin(); + + admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); + admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); + admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); + admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); + + tabA = new HTableDescriptor(tabAName); + HColumnDescriptor fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabA.addFamily(fam); + fam = new HColumnDescriptor(f2Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabA.addFamily(fam); + admin1.createTable(tabA); + admin2.createTable(tabA); + + tabB = new HTableDescriptor(tabBName); + fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabB.addFamily(fam); + fam = new HColumnDescriptor(f2Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tabB.addFamily(fam); + admin1.createTable(tabB); + admin2.createTable(tabB); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + admin1.disableTable(tabAName); + admin1.deleteTable(tabAName); + admin1.disableTable(tabBName); + admin1.deleteTable(tabBName); + admin2.disableTable(tabAName); + admin2.deleteTable(tabAName); + admin2.disableTable(tabBName); + admin2.deleteTable(tabBName); + + admin1.deleteNamespace(ns1); + admin1.deleteNamespace(ns2); + admin2.deleteNamespace(ns1); + admin2.deleteNamespace(ns2); + + connection1.close(); + connection2.close(); + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void testNamespaceReplication() throws Exception { + Table htab1A = connection1.getTable(tabAName); + Table htab2A = connection2.getTable(tabAName); + + Table htab1B = connection1.getTable(tabBName); + Table htab2B = connection2.getTable(tabBName); + + admin.peerAdded("2"); + // add ns1 to peer config which replicate to cluster2 + ReplicationPeerConfig rpc = admin.getPeerConfig("2"); + Set<String> namespaces = new HashSet<>(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updatePeerConfig("2", rpc); + LOG.info("update peer config"); + + // Table A can be replicated to cluster2 + put(htab1A, row, f1Name, f2Name); + ensureRowExisted(htab2A, row, f1Name, f2Name); + delete(htab1A, row, f1Name, f2Name); + ensureRowNotExisted(htab2A, row, f1Name, f2Name); + + // Table B can not be replicated to cluster2 + put(htab1B, row, f1Name, f2Name); + ensureRowNotExisted(htab2B, row, f1Name, f2Name); + + // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2 + rpc = admin.getPeerConfig("2"); + namespaces = new HashSet<>(); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + Map<TableName, List<String>> tableCfs = new HashMap<>(); + tableCfs.put(tabAName, new ArrayList<String>()); + tableCfs.get(tabAName).add("f1"); + rpc.setTableCFsMap(tableCfs); + admin.updatePeerConfig("2", rpc); + LOG.info("update peer config"); + + // Only family f1 of Table A can replicated to cluster2 + put(htab1A, row, f1Name, f2Name); + ensureRowExisted(htab2A, row, f1Name); + delete(htab1A, row, f1Name, f2Name); + ensureRowNotExisted(htab2A, row, f1Name); + + // All cfs of table B can replicated to cluster2 + put(htab1B, row, f1Name, f2Name); + ensureRowExisted(htab2B, row, f1Name, f2Name); + delete(htab1B, row, f1Name, f2Name); + ensureRowNotExisted(htab2B, row, f1Name, f2Name); + + admin.removePeer("2"); + } + + private void put(Table source, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Put put = new Put(row); + put.addColumn(fam, row, val); + source.put(put); + } + } + + private void delete(Table source, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Delete del = new Delete(row); + del.addFamily(fam); + source.delete(del); + } + } + + private void ensureRowExisted(Table target, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + } else { + assertEquals(res.size(), 1); + assertArrayEquals(res.value(), val); + break; + } + Thread.sleep(SLEEP_TIME); + } + } + } + + private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for delete replication"); + } + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + } else { + break; + } + Thread.sleep(SLEEP_TIME); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 04d9232..3d4062f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; @@ -196,19 +198,22 @@ public class TestReplicationWALEntryFilters { } @Test - public void testTableCfWALEntryFilter() { + public void testNamespaceTableCfWALEntryFilter() { ReplicationPeer peer = mock(ReplicationPeer.class); + // 1. no namespaces config and table-cfs config in peer + when(peer.getNamespaces()).thenReturn(null); when(peer.getTableCFs()).thenReturn(null); Entry userEntry = createEntry(null, a, b, c); - WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); + // 2. Only config table-cfs in peer // empty map userEntry = createEntry(null, a, b, c); Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table bar @@ -216,7 +221,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table foo:a @@ -224,7 +229,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a), filter.filter(userEntry)); // table foo:a,c @@ -232,8 +237,64 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); + + // 3. Only config namespaces in peer + when(peer.getTableCFs()).thenReturn(null); + // empty set + Set<String> namespaces = new HashSet<String>(); + when(peer.getNamespaces()).thenReturn(namespaces); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); + + // namespace default + namespaces.add("default"); + when(peer.getNamespaces()).thenReturn(namespaces); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); + + // namespace ns1 + namespaces = new HashSet<String>();; + namespaces.add("ns1"); + when(peer.getNamespaces()).thenReturn(namespaces); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); + + // 4. Config namespaces and table-cfs both + // Namespaces config should not confict with table-cfs config + namespaces = new HashSet<String>(); + tableCfs = new HashMap<TableName, List<String>>(); + namespaces.add("ns1"); + when(peer.getNamespaces()).thenReturn(namespaces); + tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); + when(peer.getTableCFs()).thenReturn(tableCfs); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, c), filter.filter(userEntry)); + + namespaces = new HashSet<String>();; + tableCfs = new HashMap<TableName, List<String>>(); + namespaces.add("default"); + when(peer.getNamespaces()).thenReturn(namespaces); + tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); + when(peer.getTableCFs()).thenReturn(tableCfs); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + namespaces = new HashSet<String>();; + tableCfs = new HashMap<TableName, List<String>>(); + namespaces.add("ns1"); + when(peer.getNamespaces()).thenReturn(namespaces); + tableCfs.put(TableName.valueOf("bar"), null); + when(peer.getTableCFs()).thenReturn(tableCfs); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); } private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 6759daf..6f5ad56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -280,7 +281,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster { when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getNamespaces()).thenReturn(null); when(mockPeer.getTableCFs()).thenReturn(null); + when(mockPeer.getPeerConfig()).thenReturn(new ReplicationPeerConfig()); when(context.getReplicationPeer()).thenReturn(mockPeer); replicator.init(context); http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase/replication_admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 4de3962..f99ccae 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -62,6 +62,7 @@ module Hbase config = args.fetch(CONFIG, nil) data = args.fetch(DATA, nil) table_cfs = args.fetch(TABLE_CFS, nil) + namespaces = args.fetch(NAMESPACES, nil) # Create and populate a ReplicationPeerConfig replication_peer_config = ReplicationPeerConfig.new @@ -83,6 +84,14 @@ module Hbase } end + unless namespaces.nil? + ns_set = java.util.HashSet.new + namespaces.each do |n| + ns_set.add(n) + end + replication_peer_config.set_namespaces(ns_set) + end + unless table_cfs.nil? # convert table_cfs to TableName map = java.util.HashMap.new @@ -180,12 +189,39 @@ module Hbase end @replication_admin.removePeerTableCFs(id, map) end + + # Set new namespaces config for the specified peer + def set_peer_namespaces(id, namespaces) + unless namespaces.nil? + ns_set = java.util.HashSet.new + namespaces.each do |n| + ns_set.add(n) + end + rpc = get_peer_config(id) + unless rpc.nil? + rpc.setNamespaces(ns_set) + @replication_admin.updatePeerConfig(id, rpc) + end + end + end + + # Show the current namespaces config for the specified peer + def show_peer_namespaces(peer_config) + namespaces = peer_config.get_namespaces + if !namespaces.nil? + return namespaces.join(';') + else + return nil + end + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) tableName = TableName.valueOf(table_name) @replication_admin.enableTableRep(tableName) end + #---------------------------------------------------------------------------------------------- # Disables a table's replication switch def disable_tablerep(table_name) http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase_constants.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index bc6f37c..c02d5c6 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -78,6 +78,7 @@ module HBaseConstants ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME' CLUSTER_KEY = 'CLUSTER_KEY' TABLE_CFS = 'TABLE_CFS' + NAMESPACES = 'NAMESPACES' CONFIG = 'CONFIG' DATA = 'DATA' http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index bb6a604..ee508e9 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -370,6 +370,7 @@ Shell.load_command_group( list_peers enable_peer disable_peer + set_peer_namespaces show_peer_tableCFs set_peer_tableCFs list_replicated_tables http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/add_peer.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index e9431cf..077bd69 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -27,13 +27,25 @@ must be specified to identify the peer. For a HBase cluster peer, a cluster key must be provided and is composed like this: hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent -This gives a full path for HBase to connect to another HBase cluster. An optional parameter for -table column families identifies which column families will be replicated to the peer cluster. +This gives a full path for HBase to connect to another HBase cluster. +An optional parameter for namespaces identifies which namespace's tables will be replicated +to the peer cluster. +An optional parameter for table column families identifies which tables and/or column families +will be replicated to the peer cluster. + +Notice: Set a namespace in the peer config means that all tables in this namespace +will be replicated to the peer cluster. So if you already have set a namespace in peer config, +then you can't set this namespace's tables in the peer config again. + Examples: hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase" hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", + NAMESPACES => ["ns1", "ns2", "ns3"] + hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", + NAMESPACES => ["ns1", "ns2"], TABLE_CFS => { "ns3:table1" => [], "ns3:table2" => ["cf1"] } For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments are DATA and CONFIG which can be specified to set different either the peer_data or configuration http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/list_peers.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 72a0704..ed6b575 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -32,12 +32,15 @@ EOF def command() peers = replication_admin.list_peers - formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME", + "STATE", "NAMESPACES", "TABLE_CFS"]) peers.entrySet().each do |e| state = replication_admin.get_peer_state(e.key) + namespaces = replication_admin.show_peer_namespaces(e.value) tableCFs = replication_admin.show_peer_tableCFs(e.key) - formatter.row([ e.key, e.value, state, tableCFs ]) + formatter.row([ e.key, e.value.getClusterKey, + e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ]) end formatter.footer() http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb new file mode 100644 index 0000000..75b3d11 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb @@ -0,0 +1,51 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerNamespaces< Command + def help + return <<-EOF + Set the replicable namespaces config for the specified peer. + + Set a namespace in the peer config means that all tables in this + namespace will be replicated to the peer cluster. So if you already + have set a namespace in the peer config, then you can't set this + namespace's tables in the peer config again. + + Examples: + + # set namespaces config is null, then the table-cfs config decide + # which table to be replicated. + hbase> set_peer_namespaces '1', [] + # set namespaces to be replicable for a peer. + # set a namespace in the peer config means that all tables in this + # namespace (with replication_scope != 0 ) will be replicated. + hbase> set_peer_namespaces '2', ["ns1", "ns2"] + + EOF + end + + def command(id, namespaces) + replication_admin.set_peer_namespaces(id, namespaces) + end + end + end +end http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb index b2e823c..4d3c3ec 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb @@ -23,11 +23,15 @@ module Shell class SetPeerTableCFs< Command def help return <<-EOF - Set the replicable table-cf config for the specified peer + Set the replicable table-cf config for the specified peer. + + Can't set a table to table-cfs config if it's namespace already was in + namespaces config of this peer. + Examples: - # set all tables to be replicable for a peer - hbase> set_peer_tableCFs '1', "" + # set table-cfs config is null, then the namespaces config decide which + # table to be replicated. hbase> set_peer_tableCFs '1' # set table / table-cf to be replicable for a peer, for a table without # an explicit column-family list, all replicable column-families (with http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 1d27e67..daa8f96 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -121,6 +121,49 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "add_peer: multiple zk cluster key and namespaces" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + namespaces = ["ns1", "ns2", "ns3"] + namespaces_str = "ns2;ns1;ns3" + + args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + assert(command(:list_peers).key?(@peer_id)) + peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(cluster_key, peer_config.get_cluster_key) + assert_equal(namespaces_str, + replication_admin.show_peer_namespaces(peer_config)) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + + define_test "add_peer: multiple zk cluster key and namespaces, table_cfs" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + namespaces = ["ns1", "ns2"] + table_cfs = { "ns3:table1" => [], "ns3:table2" => ["cf1"], + "ns3:table3" => ["cf1", "cf2"] } + namespaces_str = "ns2;ns1" + table_cfs_str = "ns3.table1;ns3.table3:cf1,cf2;ns3.table2:cf1" + + args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, + TABLE_CFS => table_cfs } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + assert(command(:list_peers).key?(@peer_id)) + peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(cluster_key, peer_config.get_cluster_key) + assert_equal(namespaces_str, + replication_admin.show_peer_namespaces(peer_config)) + assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id)) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } @@ -152,6 +195,30 @@ module Hbase end end + define_test "set_peer_namespaces: works with namespaces array" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + namespaces = ["ns1", "ns2"] + namespaces_str = "ns2;ns1" + + args = { CLUSTER_KEY => cluster_key } + command(:add_peer, @peer_id, args) + + # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added + # but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + command(:set_peer_namespaces, @peer_id, namespaces) + + assert_equal(1, command(:list_peers).length) + assert(command(:list_peers).key?(@peer_id)) + peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(namespaces_str, + replication_admin.show_peer_namespaces(peer_config)) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key } @@ -221,8 +288,8 @@ module Hbase assert_equal("value2", peer_config.get_configuration.get("config2")) assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1")))) assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2")))) - end + # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. # define_test "add_peer: adding a second peer with same id should error" do