HBASE-19926 Use a separated class to implement the WALActionListener for Replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b603d2c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b603d2c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b603d2c Branch: refs/heads/HBASE-19397-branch-2 Commit: 3b603d2c08c1f1905a589597737412b43970a304 Parents: 0ca7a2e Author: zhangduo <zhang...@apache.org> Authored: Sun Feb 4 10:42:33 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sun Feb 4 20:32:14 2018 +0800 ---------------------------------------------------------------------- .../replication/regionserver/Replication.java | 22 +---- .../regionserver/ReplicationSourceManager.java | 47 +--------- .../ReplicationSourceWALActionListener.java | 98 ++++++++++++++++++++ .../TestReplicationSourceManager.java | 30 ++---- 4 files changed, 108 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 0274b0a..ad12c66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; @@ -44,8 +43,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -130,23 +127,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); if (walProvider != null) { - walProvider.addWALActionsListener(new WALActionsListener() { - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.postLogRoll(newPath); - } - - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - replicationManager.scopeWALEdits(logKey, logEdit); - } - }); + walProvider + .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8543896..cbbfca0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -40,12 +40,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; @@ -64,21 +61,16 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - /** * This class is responsible to manage all the replication * sources. There are two classes of sources: @@ -471,43 +463,6 @@ public class ReplicationSourceManager implements ReplicationListener { return totalBufferUsed; } - void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException { - scopeWALEdits(logKey, logEdit, this.conf); - } - - /** - * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from - * compaction WAL edits and if the scope is local. - * @param logKey Key that may get scoped according to its edits - * @param logEdit Edits used to lookup the scopes - * @throws IOException If failed to parse the WALEdit - */ - @VisibleForTesting - static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { - boolean replicationForBulkLoadEnabled = - ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); - boolean foundOtherEdits = false; - for (Cell cell : logEdit.getCells()) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - foundOtherEdits = true; - break; - } - } - - if (!foundOtherEdits && logEdit.getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); - if (maybeEvent != null && - (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - foundOtherEdits = true; - } - } - if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - ((WALKeyImpl) logKey).serializeReplicationScope(false); - } - } - /** * Factory method to create a replication source * @param conf the configuration to use http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java new file mode 100644 index 0000000..eb12614 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Used to receive new wals. + */ +@InterfaceAudience.Private +class ReplicationSourceWALActionListener implements WALActionsListener { + + private final Configuration conf; + + private final ReplicationSourceManager manager; + + public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) { + this.conf = conf; + this.manager = manager; + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + manager.preLogRoll(newPath); + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + manager.postLogRoll(newPath); + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + scopeWALEdits(logKey, logEdit, conf); + } + + /** + * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from + * compaction WAL edits and if the scope is local. + * @param logKey Key that may get scoped according to its edits + * @param logEdit Edits used to lookup the scopes + * @throws IOException If failed to parse the WALEdit + */ + @VisibleForTesting + static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { + boolean replicationForBulkLoadEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); + boolean foundOtherEdits = false; + for (Cell cell : logEdit.getCells()) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + foundOtherEdits = true; + break; + } + } + + if (!foundOtherEdits && logEdit.getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); + if (maybeEvent != null && + (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { + // In serially replication, we use scopes when reading close marker. + foundOtherEdits = true; + } + } + if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { + ((WALKeyImpl) logKey).serializeReplicationScope(false); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 647e12d..eec8e8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -40,7 +40,6 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -84,7 +82,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -100,8 +97,10 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; @@ -270,23 +269,8 @@ public abstract class TestReplicationSourceManager { WALFactory wals = new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); ReplicationSourceManager replicationManager = replication.getReplicationManager(); - wals.getWALProvider().addWALActionsListener(new WALActionsListener() { - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - replicationManager.postLogRoll(newPath); - } - - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - replicationManager.scopeWALEdits(logKey, logEdit); - } - }); + wals.getWALProvider() + .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); final WAL wal = wals.getWAL(hri); manager.init(); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); @@ -459,7 +443,7 @@ public abstract class TestReplicationSourceManager { RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) .setEndKey(HConstants.EMPTY_END_ROW).build(); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf); + ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); } @Test @@ -471,7 +455,7 @@ public abstract class TestReplicationSourceManager { WALKeyImpl logKey = new WALKeyImpl(scope); // 3. Get the scopes for the key - ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf); + ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", @@ -490,7 +474,7 @@ public abstract class TestReplicationSourceManager { bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); // 4. Get the scopes for the key - ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf); + ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); // Assert family with replication scope global is present in the key scopes