This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 0c061bc6164fbb49afc7146095a1cf8b80a997bc Author: Guanghao Zhang <[email protected]> AuthorDate: Wed Jul 8 14:29:08 2020 +0800 HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020) Signed-off-by: Wellington Chevreuil <[email protected]> --- .../regionserver/ReplicationSource.java | 19 ++-------------- .../regionserver/ReplicationSourceInterface.java | 14 ------------ .../regionserver/ReplicationSourceManager.java | 26 +++++++++++++++++++++- .../hbase/replication/ReplicationSourceDummy.java | 9 +------- 4 files changed, 28 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d1268fa..a385ead 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -44,27 +45,24 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -264,19 +262,6 @@ public class ReplicationSource implements ReplicationSourceInterface { return logQueue.getQueues(); } - @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException { - String peerId = replicationPeer.getId(); - if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } - private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 27e4b79..352cdd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -28,12 +28,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -61,17 +58,6 @@ public interface ReplicationSourceInterface { void enqueueLog(Path log); /** - * Add hfile names to the queue to be replicated. - * @param tableName Name of the table these files belongs to - * @param family Name of the family these files belong to - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue for replication} - * @throws ReplicationException If failed to add hfile references - */ - void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException; - - /** * Start the replication */ ReplicationSourceInterface startup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 73efcfe..ad7c033 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; @@ -1050,7 +1051,30 @@ public class ReplicationSourceManager { public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException { for (ReplicationSourceInterface source : this.sources.values()) { - throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); + throwIOExceptionWhenFail(() -> addHFileRefs(source, tableName, family, pairs)); + } + } + + /** + * Add hfile names to the queue to be replicated. + * @param source the replication peer source + * @param tableName Name of the table these files belongs to + * @param family Name of the family these files belong to + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue for replication} + * @throws ReplicationException If failed to add hfile references + */ + private void addHFileRefs(ReplicationSourceInterface source, TableName tableName, byte[] family, + List<Pair<Path, Path>> pairs) throws ReplicationException { + String peerId = source.getPeerId(); + // Only the normal replication source update here, its peerId is equals to queueId. + ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId); + if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { + this.queueStorage.addHFileRefs(peerId, pairs); + source.getSourceMetrics().incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index cab01d6..4f656b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,17 +21,16 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -115,12 +114,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files) - throws ReplicationException { - return; - } - - @Override public boolean isPeerEnabled() { return true; }
