HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26ac60b0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26ac60b0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26ac60b0 Branch: refs/heads/master Commit: 26ac60b03f80c9215103a02db783341e67037753 Parents: 9647fee Author: ramkrishna <[email protected]> Authored: Thu Dec 10 13:07:46 2015 +0530 Committer: ramkrishna <[email protected]> Committed: Thu Dec 10 13:07:46 2015 +0530 ---------------------------------------------------------------------- .../hbase/replication/ReplicationPeers.java | 2 +- .../replication/ReplicationPeersZKImpl.java | 26 +- .../hbase/replication/ReplicationQueues.java | 25 +- .../replication/ReplicationQueuesClient.java | 25 +- .../ReplicationQueuesClientZKImpl.java | 37 ++ .../replication/ReplicationQueuesZKImpl.java | 70 +++ .../replication/ReplicationStateZKBase.java | 14 +- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 24 +- .../org/apache/hadoop/hbase/HConstants.java | 16 +- .../MetricsReplicationSinkSource.java | 2 + .../MetricsReplicationSourceSource.java | 6 + .../MetricsReplicationGlobalSourceSource.java | 21 + .../MetricsReplicationSinkSourceImpl.java | 7 + .../MetricsReplicationSourceSourceImpl.java | 28 + .../hbase/protobuf/generated/AdminProtos.java | 602 +++++++++++++++++-- hbase-protocol/src/main/protobuf/Admin.proto | 3 + .../hbase/mapreduce/LoadIncrementalHFiles.java | 152 +++-- .../hbase/protobuf/ReplicationProtbufUtil.java | 46 +- .../hbase/regionserver/RSRpcServices.java | 4 +- .../regionserver/ReplicationSinkService.java | 8 +- .../regionserver/wal/WALActionsListener.java | 19 +- .../hbase/replication/ScopeWALEntryFilter.java | 72 ++- .../replication/TableCfWALEntryFilter.java | 76 ++- .../master/ReplicationHFileCleaner.java | 193 ++++++ .../DefaultSourceFSConfigurationProvider.java | 78 +++ .../HBaseInterClusterReplicationEndpoint.java | 32 +- .../regionserver/HFileReplicator.java | 393 ++++++++++++ .../replication/regionserver/MetricsSink.java | 13 +- .../replication/regionserver/MetricsSource.java | 31 + .../RegionReplicaReplicationEndpoint.java | 4 +- .../replication/regionserver/Replication.java | 133 +++- .../regionserver/ReplicationSink.java | 200 +++++- .../regionserver/ReplicationSource.java | 92 ++- .../ReplicationSourceInterface.java | 13 + .../regionserver/ReplicationSourceManager.java | 21 + .../SourceFSConfigurationProvider.java | 40 ++ .../security/access/SecureBulkLoadEndpoint.java | 18 +- .../cleaner/TestReplicationHFileCleaner.java | 264 ++++++++ .../replication/ReplicationSourceDummy.java | 8 + .../replication/TestMasterReplication.java | 313 +++++++++- .../replication/TestReplicationSmallTests.java | 3 +- .../replication/TestReplicationStateBasic.java | 57 ++ .../replication/TestReplicationStateZKImpl.java | 1 + .../replication/TestReplicationSyncUpTool.java | 10 +- ...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ++++++++ .../regionserver/TestReplicationSink.java | 179 +++++- .../TestReplicationSourceManager.java | 70 ++- .../TestSourceFSConfigurationProvider.java | 25 + 48 files changed, 3444 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 8e80e06..8bf21d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -50,7 +50,7 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster * @param tableCFs the table and column-family list which will be replicated for this peer or null - * for all table and column families + * for all table and column families */ void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs) throws ReplicationException; http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 63f9ac3..fd10b66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import com.google.protobuf.ByteString; @@ -120,8 +121,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } checkQueuesDeleted(id); - + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + + // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node + if (replicationForBulkLoadEnabled) { + try { + String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(this.zookeeper, peerId); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer with id=" + id + + ", node under hfile references node.", e); + } + } + List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), toByteArray(peerConfig)); @@ -151,6 +165,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re + " because that id does not exist."); } ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); + // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile + // replication is enabled or not + + String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); + try { + LOG.info("Removing peer " + peerId + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(this.zookeeper, peerId); + } catch (NoNodeException e) { + LOG.info("Did not find node " + peerId + " to delete.", e); + } } catch (KeeperException e) { throw new ReplicationException("Could not remove peer with id=" + id, e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 3dbbc33..0d47a88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * This provides an interface for maintaining a region server's replication queues. These queues - * keep track of the WALs that still need to be replicated to remote clusters. + * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled) + * that still need to be replicated to remote clusters. */ @InterfaceAudience.Private public interface ReplicationQueues { @@ -113,4 +114,26 @@ public interface ReplicationQueues { * @return if this is this rs's znode */ boolean isThisOurZnode(String znode); + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param files list of hfile references to be added + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List<String> files) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List<String> files); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 5b3e541..7fa3bbb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperException; /** * This provides an interface for clients of replication to view replication queues. These queues - * keep track of the WALs that still need to be replicated to remote clusters. + * keep track of the sources(WALs/HFile references) that still need to be replicated to remote + * clusters. */ @InterfaceAudience.Private public interface ReplicationQueuesClient { @@ -65,4 +66,26 @@ public interface ReplicationQueuesClient { * @return cversion of replication rs node */ int getQueuesZNodeCversion() throws KeeperException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws KeeperException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + * @throws KeeperException zookeeper exception + */ + List<String> getAllPeersFromHFileRefsQueue() throws KeeperException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references, null if not found any + * @throws KeeperException zookeeper exception + */ + List<String> getReplicableHFiles(String peerId) throws KeeperException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index e1a6a49..cc407e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -84,4 +84,41 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem throw e; } } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication hfile references node.", e); + throw e; + } + return stat.getCversion(); + } + + @Override + public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of all peers in hfile references node.", e); + throw e; + } + return result; + } + + @Override + public List<String> getReplicableHFiles(String peerId) throws KeeperException { + String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); + throw e; + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 97763e2..43dd412 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -84,6 +84,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } catch (KeeperException e) { throw new ReplicationException("Could not initialize replication queues.", e); } + // If only bulk load hfile replication is enabled then create the hfile-refs znode + if (replicationForBulkLoadEnabled) { + try { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize hfile references replication queue.", + e); + } + } } @Override @@ -431,4 +440,65 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray(); return ProtobufUtil.prependPBMagic(bytes); } + + @Override + public void addHFileRefs(String peerId, List<String> files) throws ReplicationException { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + files + " in queue " + peerZnode); + } + List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); + int size = files.size(); + for (int i = 0; i < size; i++) { + listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)), + HConstants.EMPTY_BYTE_ARRAY)); + } + if (debugEnabled) { + LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); + } + } + + @Override + public void removeHFileRefs(String peerId, List<String> files) { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); + } + List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); + int size = files.size(); + for (int i = 0; i < size; i++) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)))); + } + if (debugEnabled) { + LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); + } + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + try { + if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(this.zookeeper, peerZnode); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", + e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 4fbac0f..762167f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.replication; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -48,32 +49,43 @@ public abstract class ReplicationStateZKBase { protected final String peersZNode; /** The name of the znode that contains all replication queues */ protected final String queuesZNode; + /** The name of the znode that contains queues of hfile references to be replicated */ + protected final String hfileRefsZNode; /** The cluster key of the local cluster */ protected final String ourClusterKey; protected final ZooKeeperWatcher zookeeper; protected final Configuration conf; protected final Abortable abortable; + protected final boolean replicationForBulkLoadEnabled; // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED); + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, Abortable abortable) { this.zookeeper = zookeeper; this.conf = conf; this.abortable = abortable; + this.replicationForBulkLoadEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); } public List<String> getListOfReplicators() { http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index c268268..9e01d09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -884,7 +885,7 @@ public class ZKUtil { JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null && conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null && conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) { - + return false; } } catch(Exception e) { @@ -1797,6 +1798,27 @@ public class ZKUtil { } else if (child.equals(zkw.getConfiguration(). get("zookeeper.znode.replication.rs", "rs"))) { appendRSZnodes(zkw, znode, sb); + } else if (child.equals(zkw.getConfiguration().get( + ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) { + appendHFileRefsZnodes(zkw, znode, sb); + } + } + } + + private static void appendHFileRefsZnodes(ZooKeeperWatcher zkw, String hfileRefsZnode, + StringBuilder sb) throws KeeperException { + sb.append("\n").append(hfileRefsZnode).append(": "); + for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) { + String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode); + sb.append("\n").append(znodeToProcess).append(": "); + List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess); + int size = peerHFileRefsZnodes.size(); + for (int i = 0; i < size; i++) { + sb.append(peerHFileRefsZnodes.get(i)); + if (i != size - 1) { + sb.append(", "); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ac57514..6fafad3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -850,6 +850,18 @@ public final class HConstants { REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.regionserver.Replication"; + public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; + public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; + /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ + public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; + /** + * Directory where the source cluster file system client configuration are placed which is used by + * sink cluster to copy HFiles from source cluster file system + */ + public static final String REPLICATION_CONF_DIR = "hbase.replication.conf.dir"; + + /** Maximum time to retry for a failed bulk load request */ + public static final String BULKLOAD_MAX_RETRIES_NUMBER = "hbase.bulkload.retries.number"; /** HBCK special code name used as server name when manipulating ZK nodes */ public static final String HBCK_CODE_NAME = "HBCKServerName"; @@ -1241,7 +1253,7 @@ public final class HConstants { public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY = "hbase.canary.write.table.check.period"; - + /** * Configuration keys for programmatic JAAS configuration for secured ZK interaction */ @@ -1250,7 +1262,7 @@ public final class HConstants { "hbase.zookeeper.client.kerberos.principal"; public static final String ZK_SERVER_KEYTAB_FILE = "hbase.zookeeper.server.keytab.file"; public static final String ZK_SERVER_KERBEROS_PRINCIPAL = - "hbase.zookeeper.server.kerberos.principal"; + "hbase.zookeeper.server.kerberos.principal"; private HConstants() { // Can't be instantiated with this ctor. http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java index 698a59a..9fb8415 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -22,9 +22,11 @@ public interface MetricsReplicationSinkSource { public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; public static final String SINK_APPLIED_OPS = "sink.appliedOps"; + public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles"; void setLastAppliedOpAge(long age); void incrAppliedBatches(long batches); void incrAppliedOps(long batchsize); long getLastAppliedOpAge(); + void incrAppliedHFiles(long hfileSize); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index fecf191..188c3a3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -32,6 +32,9 @@ public interface MetricsReplicationSourceSource { public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; + public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles"; + public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue"; + void setLastShippedAge(long age); void setSizeOfLogQueue(int size); void incrSizeOfLogQueue(int size); @@ -44,4 +47,7 @@ public interface MetricsReplicationSourceSource { void incrLogReadInEdits(long size); void clear(); long getLastShippedAge(); + void incrHFilesShipped(long hfiles); + void incrSizeOfHFileRefsQueue(long size); + void decrSizeOfHFileRefsQueue(long size); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 6dace10..392cd39 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -32,6 +32,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableCounterLong shippedOpsCounter; private final MutableCounterLong shippedKBsCounter; private final MutableCounterLong logReadInBytesCounter; + private final MutableCounterLong shippedHFilesCounter; + private final MutableGaugeLong sizeOfHFileRefsQueueGauge; public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -51,6 +53,11 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L); logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L); + + shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_HFILES, 0L); + + sizeOfHFileRefsQueueGauge = + rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); } @Override public void setLastShippedAge(long age) { @@ -100,4 +107,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public long getLastShippedAge() { return ageOfLastShippedOpGauge.value(); } + + @Override public void incrHFilesShipped(long hfiles) { + shippedHFilesCounter.incr(hfiles); + } + + @Override + public void incrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.incr(size); + } + + @Override + public void decrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.decr(size); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java index 14212ba..8f4a337 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -26,11 +26,13 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS private final MutableGaugeLong ageGauge; private final MutableCounterLong batchesCounter; private final MutableCounterLong opsCounter; + private final MutableCounterLong hfilesCounter; public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L); batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L); opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L); + hfilesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_HFILES, 0L); } @Override public void setLastAppliedOpAge(long age) { @@ -49,4 +51,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS public long getLastAppliedOpAge() { return ageGauge.value(); } + + @Override + public void incrAppliedHFiles(long hfiles) { + hfilesCounter.incr(hfiles); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 1422e7e..217cc3e 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -32,6 +32,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String shippedOpsKey; private final String shippedKBsKey; private final String logReadInBytesKey; + private final String shippedHFilesKey; + private final String sizeOfHFileRefsQueueKey; private final MutableGaugeLong ageOfLastShippedOpGauge; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -41,6 +43,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableCounterLong shippedOpsCounter; private final MutableCounterLong shippedKBsCounter; private final MutableCounterLong logReadInBytesCounter; + private final MutableCounterLong shippedHFilesCounter; + private final MutableGaugeLong sizeOfHFileRefsQueueGauge; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -69,6 +73,12 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou logEditsFilteredKey = "source." + id + ".logEditsFiltered"; logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L); + + shippedHFilesKey = "source." + this.id + ".shippedHFiles"; + shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(shippedHFilesKey, 0L); + + sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue"; + sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfHFileRefsQueueKey, 0L); } @Override public void setLastShippedAge(long age) { @@ -124,10 +134,28 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(logReadInEditsKey); rms.removeMetric(logEditsFilteredKey); + + rms.removeMetric(shippedHFilesKey); + rms.removeMetric(sizeOfHFileRefsQueueKey); } @Override public long getLastShippedAge() { return ageOfLastShippedOpGauge.value(); } + + @Override + public void incrHFilesShipped(long hfiles) { + shippedHFilesCounter.incr(hfiles); + } + + @Override + public void incrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.incr(size); + } + + @Override + public void decrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.decr(size); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index b4c378b..1c59ea6 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -16896,6 +16896,51 @@ public final class AdminProtos { */ org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntryOrBuilder getEntryOrBuilder( int index); + + // optional string replicationClusterId = 2; + /** + * <code>optional string replicationClusterId = 2;</code> + */ + boolean hasReplicationClusterId(); + /** + * <code>optional string replicationClusterId = 2;</code> + */ + java.lang.String getReplicationClusterId(); + /** + * <code>optional string replicationClusterId = 2;</code> + */ + com.google.protobuf.ByteString + getReplicationClusterIdBytes(); + + // optional string sourceBaseNamespaceDirPath = 3; + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + boolean hasSourceBaseNamespaceDirPath(); + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + java.lang.String getSourceBaseNamespaceDirPath(); + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + com.google.protobuf.ByteString + getSourceBaseNamespaceDirPathBytes(); + + // optional string sourceHFileArchiveDirPath = 4; + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + boolean hasSourceHFileArchiveDirPath(); + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + java.lang.String getSourceHFileArchiveDirPath(); + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + com.google.protobuf.ByteString + getSourceHFileArchiveDirPathBytes(); } /** * Protobuf type {@code hbase.pb.ReplicateWALEntryRequest} @@ -16963,6 +17008,21 @@ public final class AdminProtos { entry_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.PARSER, extensionRegistry)); break; } + case 18: { + bitField0_ |= 0x00000001; + replicationClusterId_ = input.readBytes(); + break; + } + case 26: { + bitField0_ |= 0x00000002; + sourceBaseNamespaceDirPath_ = input.readBytes(); + break; + } + case 34: { + bitField0_ |= 0x00000004; + sourceHFileArchiveDirPath_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17005,6 +17065,7 @@ public final class AdminProtos { return PARSER; } + private int bitField0_; // repeated .hbase.pb.WALEntry entry = 1; public static final int ENTRY_FIELD_NUMBER = 1; private java.util.List<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry> entry_; @@ -17041,8 +17102,140 @@ public final class AdminProtos { return entry_.get(index); } + // optional string replicationClusterId = 2; + public static final int REPLICATIONCLUSTERID_FIELD_NUMBER = 2; + private java.lang.Object replicationClusterId_; + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public boolean hasReplicationClusterId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public java.lang.String getReplicationClusterId() { + java.lang.Object ref = replicationClusterId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + replicationClusterId_ = s; + } + return s; + } + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public com.google.protobuf.ByteString + getReplicationClusterIdBytes() { + java.lang.Object ref = replicationClusterId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationClusterId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string sourceBaseNamespaceDirPath = 3; + public static final int SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER = 3; + private java.lang.Object sourceBaseNamespaceDirPath_; + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public boolean hasSourceBaseNamespaceDirPath() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public java.lang.String getSourceBaseNamespaceDirPath() { + java.lang.Object ref = sourceBaseNamespaceDirPath_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + sourceBaseNamespaceDirPath_ = s; + } + return s; + } + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public com.google.protobuf.ByteString + getSourceBaseNamespaceDirPathBytes() { + java.lang.Object ref = sourceBaseNamespaceDirPath_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + sourceBaseNamespaceDirPath_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string sourceHFileArchiveDirPath = 4; + public static final int SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER = 4; + private java.lang.Object sourceHFileArchiveDirPath_; + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public boolean hasSourceHFileArchiveDirPath() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public java.lang.String getSourceHFileArchiveDirPath() { + java.lang.Object ref = sourceHFileArchiveDirPath_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + sourceHFileArchiveDirPath_ = s; + } + return s; + } + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public com.google.protobuf.ByteString + getSourceHFileArchiveDirPathBytes() { + java.lang.Object ref = sourceHFileArchiveDirPath_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + sourceHFileArchiveDirPath_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { entry_ = java.util.Collections.emptyList(); + replicationClusterId_ = ""; + sourceBaseNamespaceDirPath_ = ""; + sourceHFileArchiveDirPath_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17065,6 +17258,15 @@ public final class AdminProtos { for (int i = 0; i < entry_.size(); i++) { output.writeMessage(1, entry_.get(i)); } + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(2, getReplicationClusterIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(3, getSourceBaseNamespaceDirPathBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(4, getSourceHFileArchiveDirPathBytes()); + } getUnknownFields().writeTo(output); } @@ -17078,6 +17280,18 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(1, entry_.get(i)); } + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getReplicationClusterIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, getSourceBaseNamespaceDirPathBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getSourceHFileArchiveDirPathBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -17103,6 +17317,21 @@ public final class AdminProtos { boolean result = true; result = result && getEntryList() .equals(other.getEntryList()); + result = result && (hasReplicationClusterId() == other.hasReplicationClusterId()); + if (hasReplicationClusterId()) { + result = result && getReplicationClusterId() + .equals(other.getReplicationClusterId()); + } + result = result && (hasSourceBaseNamespaceDirPath() == other.hasSourceBaseNamespaceDirPath()); + if (hasSourceBaseNamespaceDirPath()) { + result = result && getSourceBaseNamespaceDirPath() + .equals(other.getSourceBaseNamespaceDirPath()); + } + result = result && (hasSourceHFileArchiveDirPath() == other.hasSourceHFileArchiveDirPath()); + if (hasSourceHFileArchiveDirPath()) { + result = result && getSourceHFileArchiveDirPath() + .equals(other.getSourceHFileArchiveDirPath()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -17120,6 +17349,18 @@ public final class AdminProtos { hash = (37 * hash) + ENTRY_FIELD_NUMBER; hash = (53 * hash) + getEntryList().hashCode(); } + if (hasReplicationClusterId()) { + hash = (37 * hash) + REPLICATIONCLUSTERID_FIELD_NUMBER; + hash = (53 * hash) + getReplicationClusterId().hashCode(); + } + if (hasSourceBaseNamespaceDirPath()) { + hash = (37 * hash) + SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER; + hash = (53 * hash) + getSourceBaseNamespaceDirPath().hashCode(); + } + if (hasSourceHFileArchiveDirPath()) { + hash = (37 * hash) + SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER; + hash = (53 * hash) + getSourceHFileArchiveDirPath().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17243,6 +17484,12 @@ public final class AdminProtos { } else { entryBuilder_.clear(); } + replicationClusterId_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + sourceBaseNamespaceDirPath_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); + sourceHFileArchiveDirPath_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -17270,6 +17517,7 @@ public final class AdminProtos { public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest buildPartial() { org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest result = new org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest(this); int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; if (entryBuilder_ == null) { if (((bitField0_ & 0x00000001) == 0x00000001)) { entry_ = java.util.Collections.unmodifiableList(entry_); @@ -17279,6 +17527,19 @@ public final class AdminProtos { } else { result.entry_ = entryBuilder_.build(); } + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000001; + } + result.replicationClusterId_ = replicationClusterId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.sourceBaseNamespaceDirPath_ = sourceBaseNamespaceDirPath_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.sourceHFileArchiveDirPath_ = sourceHFileArchiveDirPath_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -17320,6 +17581,21 @@ public final class AdminProtos { } } } + if (other.hasReplicationClusterId()) { + bitField0_ |= 0x00000002; + replicationClusterId_ = other.replicationClusterId_; + onChanged(); + } + if (other.hasSourceBaseNamespaceDirPath()) { + bitField0_ |= 0x00000004; + sourceBaseNamespaceDirPath_ = other.sourceBaseNamespaceDirPath_; + onChanged(); + } + if (other.hasSourceHFileArchiveDirPath()) { + bitField0_ |= 0x00000008; + sourceHFileArchiveDirPath_ = other.sourceHFileArchiveDirPath_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17593,6 +17869,228 @@ public final class AdminProtos { return entryBuilder_; } + // optional string replicationClusterId = 2; + private java.lang.Object replicationClusterId_ = ""; + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public boolean hasReplicationClusterId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public java.lang.String getReplicationClusterId() { + java.lang.Object ref = replicationClusterId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + replicationClusterId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public com.google.protobuf.ByteString + getReplicationClusterIdBytes() { + java.lang.Object ref = replicationClusterId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + replicationClusterId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public Builder setReplicationClusterId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + replicationClusterId_ = value; + onChanged(); + return this; + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public Builder clearReplicationClusterId() { + bitField0_ = (bitField0_ & ~0x00000002); + replicationClusterId_ = getDefaultInstance().getReplicationClusterId(); + onChanged(); + return this; + } + /** + * <code>optional string replicationClusterId = 2;</code> + */ + public Builder setReplicationClusterIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + replicationClusterId_ = value; + onChanged(); + return this; + } + + // optional string sourceBaseNamespaceDirPath = 3; + private java.lang.Object sourceBaseNamespaceDirPath_ = ""; + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public boolean hasSourceBaseNamespaceDirPath() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public java.lang.String getSourceBaseNamespaceDirPath() { + java.lang.Object ref = sourceBaseNamespaceDirPath_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + sourceBaseNamespaceDirPath_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public com.google.protobuf.ByteString + getSourceBaseNamespaceDirPathBytes() { + java.lang.Object ref = sourceBaseNamespaceDirPath_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + sourceBaseNamespaceDirPath_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public Builder setSourceBaseNamespaceDirPath( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + sourceBaseNamespaceDirPath_ = value; + onChanged(); + return this; + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public Builder clearSourceBaseNamespaceDirPath() { + bitField0_ = (bitField0_ & ~0x00000004); + sourceBaseNamespaceDirPath_ = getDefaultInstance().getSourceBaseNamespaceDirPath(); + onChanged(); + return this; + } + /** + * <code>optional string sourceBaseNamespaceDirPath = 3;</code> + */ + public Builder setSourceBaseNamespaceDirPathBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + sourceBaseNamespaceDirPath_ = value; + onChanged(); + return this; + } + + // optional string sourceHFileArchiveDirPath = 4; + private java.lang.Object sourceHFileArchiveDirPath_ = ""; + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public boolean hasSourceHFileArchiveDirPath() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public java.lang.String getSourceHFileArchiveDirPath() { + java.lang.Object ref = sourceHFileArchiveDirPath_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + sourceHFileArchiveDirPath_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public com.google.protobuf.ByteString + getSourceHFileArchiveDirPathBytes() { + java.lang.Object ref = sourceHFileArchiveDirPath_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + sourceHFileArchiveDirPath_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public Builder setSourceHFileArchiveDirPath( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + sourceHFileArchiveDirPath_ = value; + onChanged(); + return this; + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public Builder clearSourceHFileArchiveDirPath() { + bitField0_ = (bitField0_ & ~0x00000008); + sourceHFileArchiveDirPath_ = getDefaultInstance().getSourceHFileArchiveDirPath(); + onChanged(); + return this; + } + /** + * <code>optional string sourceHFileArchiveDirPath = 4;</code> + */ + public Builder setSourceHFileArchiveDirPathBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + sourceHFileArchiveDirPath_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicateWALEntryRequest) } @@ -23539,56 +24037,58 @@ public final class AdminProtos { "ster_system_time\030\004 \001(\004\"\026\n\024MergeRegionsRe" + "sponse\"a\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.hbase." + "pb.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as", - "sociated_cell_count\030\003 \001(\005\"=\n\030ReplicateWA" + - "LEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb." + - "WALEntry\"\033\n\031ReplicateWALEntryResponse\"\026\n" + - "\024RollWALWriterRequest\"0\n\025RollWALWriterRe" + - "sponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopS" + - "erverRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServ" + - "erResponse\"\026\n\024GetServerInfoRequest\"K\n\nSe" + - "rverInfo\022)\n\013server_name\030\001 \002(\0132\024.hbase.pb" + - ".ServerName\022\022\n\nwebui_port\030\002 \001(\r\"B\n\025GetSe" + - "rverInfoResponse\022)\n\013server_info\030\001 \002(\0132\024.", - "hbase.pb.ServerInfo\"\034\n\032UpdateConfigurati" + - "onRequest\"\035\n\033UpdateConfigurationResponse" + - "2\207\013\n\014AdminService\022P\n\rGetRegionInfo\022\036.hba" + - "se.pb.GetRegionInfoRequest\032\037.hbase.pb.Ge" + - "tRegionInfoResponse\022M\n\014GetStoreFile\022\035.hb" + - "ase.pb.GetStoreFileRequest\032\036.hbase.pb.Ge" + - "tStoreFileResponse\022V\n\017GetOnlineRegion\022 ." + - "hbase.pb.GetOnlineRegionRequest\032!.hbase." + - "pb.GetOnlineRegionResponse\022G\n\nOpenRegion" + - "\022\033.hbase.pb.OpenRegionRequest\032\034.hbase.pb", - ".OpenRegionResponse\022M\n\014WarmupRegion\022\035.hb" + - "ase.pb.WarmupRegionRequest\032\036.hbase.pb.Wa" + - "rmupRegionResponse\022J\n\013CloseRegion\022\034.hbas" + - "e.pb.CloseRegionRequest\032\035.hbase.pb.Close" + - "RegionResponse\022J\n\013FlushRegion\022\034.hbase.pb" + - ".FlushRegionRequest\032\035.hbase.pb.FlushRegi" + - "onResponse\022J\n\013SplitRegion\022\034.hbase.pb.Spl" + - "itRegionRequest\032\035.hbase.pb.SplitRegionRe" + - "sponse\022P\n\rCompactRegion\022\036.hbase.pb.Compa" + - "ctRegionRequest\032\037.hbase.pb.CompactRegion", - "Response\022M\n\014MergeRegions\022\035.hbase.pb.Merg" + - "eRegionsRequest\032\036.hbase.pb.MergeRegionsR" + - "esponse\022\\\n\021ReplicateWALEntry\022\".hbase.pb." + - "ReplicateWALEntryRequest\032#.hbase.pb.Repl" + - "icateWALEntryResponse\022Q\n\006Replay\022\".hbase." + - "pb.ReplicateWALEntryRequest\032#.hbase.pb.R" + - "eplicateWALEntryResponse\022P\n\rRollWALWrite" + - "r\022\036.hbase.pb.RollWALWriterRequest\032\037.hbas" + - "e.pb.RollWALWriterResponse\022P\n\rGetServerI" + - "nfo\022\036.hbase.pb.GetServerInfoRequest\032\037.hb", - "ase.pb.GetServerInfoResponse\022G\n\nStopServ" + - "er\022\033.hbase.pb.StopServerRequest\032\034.hbase." + - "pb.StopServerResponse\022_\n\022UpdateFavoredNo" + - "des\022#.hbase.pb.UpdateFavoredNodesRequest" + - "\032$.hbase.pb.UpdateFavoredNodesResponse\022b" + - "\n\023UpdateConfiguration\022$.hbase.pb.UpdateC" + - "onfigurationRequest\032%.hbase.pb.UpdateCon" + - "figurationResponseBA\n*org.apache.hadoop." + - "hbase.protobuf.generatedB\013AdminProtosH\001\210" + - "\001\001\240\001\001" + "sociated_cell_count\030\003 \001(\005\"\242\001\n\030ReplicateW" + + "ALEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb" + + ".WALEntry\022\034\n\024replicationClusterId\030\002 \001(\t\022" + + "\"\n\032sourceBaseNamespaceDirPath\030\003 \001(\t\022!\n\031s" + + "ourceHFileArchiveDirPath\030\004 \001(\t\"\033\n\031Replic" + + "ateWALEntryResponse\"\026\n\024RollWALWriterRequ" + + "est\"0\n\025RollWALWriterResponse\022\027\n\017region_t" + + "o_flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006re" + + "ason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetS" + + "erverInfoRequest\"K\n\nServerInfo\022)\n\013server", + "_name\030\001 \002(\0132\024.hbase.pb.ServerName\022\022\n\nweb" + + "ui_port\030\002 \001(\r\"B\n\025GetServerInfoResponse\022)" + + "\n\013server_info\030\001 \002(\0132\024.hbase.pb.ServerInf" + + "o\"\034\n\032UpdateConfigurationRequest\"\035\n\033Updat" + + "eConfigurationResponse2\207\013\n\014AdminService\022" + + "P\n\rGetRegionInfo\022\036.hbase.pb.GetRegionInf" + + "oRequest\032\037.hbase.pb.GetRegionInfoRespons" + + "e\022M\n\014GetStoreFile\022\035.hbase.pb.GetStoreFil" + + "eRequest\032\036.hbase.pb.GetStoreFileResponse" + + "\022V\n\017GetOnlineRegion\022 .hbase.pb.GetOnline", + "RegionRequest\032!.hbase.pb.GetOnlineRegion" + + "Response\022G\n\nOpenRegion\022\033.hbase.pb.OpenRe" + + "gionRequest\032\034.hbase.pb.OpenRegionRespons" + + "e\022M\n\014WarmupRegion\022\035.hbase.pb.WarmupRegio" + + "nRequest\032\036.hbase.pb.WarmupRegionResponse" + + "\022J\n\013CloseRegion\022\034.hbase.pb.CloseRegionRe" + + "quest\032\035.hbase.pb.CloseRegionResponse\022J\n\013" + + "FlushRegion\022\034.hbase.pb.FlushRegionReques" + + "t\032\035.hbase.pb.FlushRegionResponse\022J\n\013Spli" + + "tRegion\022\034.hbase.pb.SplitRegionRequest\032\035.", + "hbase.pb.SplitRegionResponse\022P\n\rCompactR" + + "egion\022\036.hbase.pb.CompactRegionRequest\032\037." + + "hbase.pb.CompactRegionResponse\022M\n\014MergeR" + + "egions\022\035.hbase.pb.MergeRegionsRequest\032\036." + + "hbase.pb.MergeRegionsResponse\022\\\n\021Replica" + + "teWALEntry\022\".hbase.pb.ReplicateWALEntryR" + + "equest\032#.hbase.pb.ReplicateWALEntryRespo" + + "nse\022Q\n\006Replay\022\".hbase.pb.ReplicateWALEnt" + + "ryRequest\032#.hbase.pb.ReplicateWALEntryRe" + + "sponse\022P\n\rRollWALWriter\022\036.hbase.pb.RollW", + "ALWriterRequest\032\037.hbase.pb.RollWALWriter" + + "Response\022P\n\rGetServerInfo\022\036.hbase.pb.Get" + + "ServerInfoRequest\032\037.hbase.pb.GetServerIn" + + "foResponse\022G\n\nStopServer\022\033.hbase.pb.Stop" + + "ServerRequest\032\034.hbase.pb.StopServerRespo" + + "nse\022_\n\022UpdateFavoredNodes\022#.hbase.pb.Upd" + + "ateFavoredNodesRequest\032$.hbase.pb.Update" + + "FavoredNodesResponse\022b\n\023UpdateConfigurat" + + "ion\022$.hbase.pb.UpdateConfigurationReques" + + "t\032%.hbase.pb.UpdateConfigurationResponse", + "BA\n*org.apache.hadoop.hbase.protobuf.gen" + + "eratedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23750,7 +24250,7 @@ public final class AdminProtos { internal_static_hbase_pb_ReplicateWALEntryRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ReplicateWALEntryRequest_descriptor, - new java.lang.String[] { "Entry", }); + new java.lang.String[] { "Entry", "ReplicationClusterId", "SourceBaseNamespaceDirPath", "SourceHFileArchiveDirPath", }); internal_static_hbase_pb_ReplicateWALEntryResponse_descriptor = getDescriptor().getMessageTypes().get(24); internal_static_hbase_pb_ReplicateWALEntryResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/protobuf/Admin.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index f7787f5..a1905a4 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -211,6 +211,9 @@ message WALEntry { */ message ReplicateWALEntryRequest { repeated WALEntry entry = 1; + optional string replicationClusterId = 2; + optional string sourceBaseNamespaceDirPath = 3; + optional string sourceHFileArchiveDirPath = 4; } message ReplicateWALEntryResponse { http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 44be2d3..369ae90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; @@ -125,6 +126,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private FsDelegationToken fsDelegationToken; private String bulkToken; private UserProvider userProvider; + private int nrThreads; private LoadIncrementalHFiles() {} @@ -146,6 +148,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); + nrThreads = conf.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); initalized = true; } @@ -246,7 +250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * region boundary, and each part is added back into the queue. * The import process finishes when the queue is empty. */ - static class LoadQueueItem { + public static class LoadQueueItem { final byte[] family; final Path hfilePath; @@ -313,7 +317,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param table the table to load into * @throws TableNotFoundException if table does not yet exist */ - @SuppressWarnings("deprecation") public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException { @@ -321,16 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new TableNotFoundException("Table " + table.getName() + "is not currently available."); } - // initialize thread pools - int nrThreads = getConf().getInt("hbase.loadincremental.threads.max", - Runtime.getRuntime().availableProcessors()); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("LoadIncrementalHFiles-%1$d"); - ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, - 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - builder.build()); - ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); + ExecutorService pool = createExecutorService(); // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread @@ -347,30 +341,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { "option, consider removing the files and bulkload again without this option. " + "See HBASE-13985"); } - discoverLoadQueue(queue, hfofDir, validateHFile); - // check whether there is invalid family name in HFiles to be bulkloaded - Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); - ArrayList<String> familyNames = new ArrayList<String>(families.size()); - for (HColumnDescriptor family : families) { - familyNames.add(family.getNameAsString()); - } - ArrayList<String> unmatchedFamilies = new ArrayList<String>(); - Iterator<LoadQueueItem> queueIter = queue.iterator(); - while (queueIter.hasNext()) { - LoadQueueItem lqi = queueIter.next(); - String familyNameInHFile = Bytes.toString(lqi.family); - if (!familyNames.contains(familyNameInHFile)) { - unmatchedFamilies.add(familyNameInHFile); - } - } - if (unmatchedFamilies.size() > 0) { - String msg = - "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " - + unmatchedFamilies + "; valid family names of table " - + table.getName() + " are: " + familyNames; - LOG.error(msg); - throw new IOException(msg); - } + prepareHFileQueue(hfofDir, table, queue, validateHFile); + int count = 0; if (queue.isEmpty()) { @@ -397,7 +369,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + count + " with " + queue.size() + " files remaining to group or split"); } - int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10); + int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); if (maxRetries != 0 && count >= maxRetries) { throw new IOException("Retry attempted " + count + @@ -447,6 +419,85 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue, + boolean validateHFile) throws IOException { + discoverLoadQueue(queue, hfofDir, validateHFile); + validateFamiliesInHFiles(table, queue); + } + + // Initialize a thread pool + private ExecutorService createExecutorService() { + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("LoadIncrementalHFiles-%1$d"); + ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), builder.build()); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Checks whether there is any invalid family name in HFiles to be bulk loaded. + */ + private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue) + throws IOException { + Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); + List<String> familyNames = new ArrayList<String>(families.size()); + for (HColumnDescriptor family : families) { + familyNames.add(family.getNameAsString()); + } + List<String> unmatchedFamilies = new ArrayList<String>(); + Iterator<LoadQueueItem> queueIter = queue.iterator(); + while (queueIter.hasNext()) { + LoadQueueItem lqi = queueIter.next(); + String familyNameInHFile = Bytes.toString(lqi.family); + if (!familyNames.contains(familyNameInHFile)) { + unmatchedFamilies.add(familyNameInHFile); + } + } + if (unmatchedFamilies.size() > 0) { + String msg = + "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + + familyNames; + LOG.error(msg); + throw new IOException(msg); + } + } + + /** + * Used by the replication sink to load the hfiles from the source cluster. It does the following, + * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2. + * {@link + * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)} + * @param table Table to which these hfiles should be loaded to + * @param conn Connection to use + * @param queue {@link LoadQueueItem} has hfiles yet to be loaded + * @param startEndKeys starting and ending row keys of the region + */ + public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue, + Pair<byte[][], byte[][]> startEndKeys) throws IOException { + ExecutorService pool = null; + try { + pool = createExecutorService(); + Multimap<ByteBuffer, LoadQueueItem> regionGroups = + groupOrSplitPhase(table, pool, queue, startEndKeys); + bulkLoadPhase(table, conn, pool, queue, regionGroups); + } finally { + if (pool != null) { + pool.shutdown(); + } + } + } + + /** * This takes the LQI's grouped by likely regions and attempts to bulk load * them. Any failures are re-queued for another pass with the * groupOrSplitPhase. @@ -592,10 +643,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String uniqueName = getUniqueName(); HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); + Path botOut = new Path(tmpDir, uniqueName + ".bottom"); Path topOut = new Path(tmpDir, uniqueName + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, - botOut, topOut); + splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); FileSystem fs = tmpDir.getFileSystem(getConf()); fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); @@ -626,6 +677,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; + // fs is the source filesystem + if (fs == null) { + fs = hfilePath.getFileSystem(getConf()); + } HFile.Reader hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), getConf()); final byte[] first, last; @@ -712,7 +767,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * failure */ protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, - final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) + final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) throws IOException { final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(lqis.size()); @@ -747,6 +802,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { //in user directory if(secureClient != null && !success) { FileSystem targetFs = FileSystem.get(getConf()); + // fs is the source filesystem + if(fs == null) { + fs = lqis.iterator().next().hfilePath.getFileSystem(getConf()); + } // Check to see if the source and target filesystems are the same // If they are the same filesystem, we will try move the files back // because previously we moved them to the staging directory. @@ -1000,4 +1059,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { System.exit(ret); } + /** + * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is + * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes + * property. This directory is used as a temporary directory where all files are initially + * copied/moved from user given directory, set all the required file permissions and then from + * their it is finally loaded into a table. This should be set only when, one would like to manage + * the staging directory by itself. Otherwise this tool will handle this by itself. + * @param stagingDir staging directory path + */ + public void setBulkToken(String stagingDir) { + this.bulkToken = stagingDir; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index d6a120b..91185af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -28,22 +28,23 @@ import java.util.Map; import java.util.NavigableMap; import java.util.UUID; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKey; import com.google.protobuf.ServiceException; @@ -51,15 +52,20 @@ import com.google.protobuf.ServiceException; public class ReplicationProtbufUtil { /** * A helper to replicate a list of WAL entries using admin protocol. - * - * @param admin - * @param entries + * @param admin Admin service + * @param entries Array of WAL entries to be replicated + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDir Path to source cluster base namespace directory + * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory * @throws java.io.IOException */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, - final Entry[] entries) throws IOException { + final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir, + Path sourceHFileArchiveDir) throws IOException { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = - buildReplicateWALEntryRequest(entries, null); + buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, + sourceHFileArchiveDir); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); try { admin.replicateWALEntry(controller, p.getFirst()); @@ -77,19 +83,22 @@ public class ReplicationProtbufUtil { */ public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(final Entry[] entries) { - return buildReplicateWALEntryRequest(entries, null); + return buildReplicateWALEntryRequest(entries, null, null, null, null); } /** * Create a new ReplicateWALEntryRequest from a list of WAL entries - * * @param entries the WAL entries to be replicated * @param encodedRegionName alternative region name to use if not null - * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values - * found. + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDir Path to source cluster base namespace directory + * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found. */ public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> - buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) { + buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) { // Accumulate all the Cells seen in here. List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length); int size = 0; @@ -146,6 +155,17 @@ public class ReplicationProtbufUtil { entryBuilder.setAssociatedCellCount(cells.size()); builder.addEntry(entryBuilder.build()); } + + if (replicationClusterId != null) { + builder.setReplicationClusterId(replicationClusterId); + } + if (sourceBaseNamespaceDir != null) { + builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString()); + } + if (sourceHFileArchiveDir != null) { + builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString()); + } + return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(), getCellScanner(allCells, size)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d94e11c..0c9b0e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1800,7 +1800,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); - regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); + regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, + request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), + request.getSourceHFileArchiveDirPath()); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); return ReplicateWALEntryResponse.newBuilder().build(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java index 5f96bf7..836d3aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService { * Carry on the list of log entries down to the sink * @param entries list of WALEntries to replicate * @param cells Cells that the WALEntries refer to (if cells is non-null) + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace + * directory required for replicating hfiles + * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory * @throws IOException */ - void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException; + void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId, + String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException; }
