This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch revert-2372-HBASE-25003_branch-2.2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit acf068c28d22c1fc56df594492be04f1580de682 Author: Wei-Chiu Chuang <[email protected]> AuthorDate: Fri Sep 11 10:59:41 2020 -0700 Revert "HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)" This reverts commit 7cea31869ee836f051fa2298b053676babeb78dc. --- .../MetricsReplicationGlobalSourceSource.java | 39 ------ .../MetricsReplicationSourceFactory.java | 3 +- .../MetricsReplicationTableSource.java | 32 ----- ...a => MetricsReplicationGlobalSourceSource.java} | 18 +-- .../MetricsReplicationSourceFactoryImpl.java | 8 +- .../MetricsReplicationSourceSourceImpl.java | 2 +- .../MetricsReplicationTableSourceImpl.java | 134 --------------------- .../replication/regionserver/MetricsSource.java | 55 ++------- .../replication/regionserver/Replication.java | 7 +- .../regionserver/ReplicationSource.java | 6 +- .../regionserver/ReplicationSourceManager.java | 25 +--- .../regionserver/ReplicationSourceShipper.java | 10 +- .../regionserver/ReplicationSourceWALReader.java | 17 ++- .../replication/regionserver/WALEntryBatch.java | 24 ++-- .../hbase/replication/TestReplicationEndpoint.java | 97 +++------------ .../regionserver/TestWALEntryStream.java | 5 - 16 files changed, 52 insertions(+), 430 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java deleted file mode 100644 index e373a6c..0000000 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.yetus.audience.InterfaceAudience; - [email protected] -public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource { - - public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; - - /** - * Sets the total usage of memory used by edits in memory read from WALs. The memory represented - * by this usage measure is across peers/sources. For example, we may batch the same WAL edits - * multiple times for the sake of replicating them to multiple peers.. - * @param usage The memory used by edits in bytes - */ - void setWALReaderEditsBufferBytes(long usage); - - /** - * Returns the size, in bytes, of edits held in memory to be replicated across all peers. - */ - long getWALReaderEditsBufferBytes(); -} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java index 5e4ad27..6534b11 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -24,6 +24,5 @@ import org.apache.yetus.audience.InterfaceAudience; public interface MetricsReplicationSourceFactory { public MetricsReplicationSinkSource getSink(); public MetricsReplicationSourceSource getSource(String id); - public MetricsReplicationTableSource getTableSource(String tableName); - public MetricsReplicationGlobalSourceSource getGlobalSource(); + public MetricsReplicationSourceSource getGlobalSource(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java deleted file mode 100644 index faa944a..0000000 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.hbase.metrics.BaseSource; -import org.apache.yetus.audience.InterfaceAudience; - [email protected] -public interface MetricsReplicationTableSource extends BaseSource { - - void setLastShippedAge(long age); - void incrShippedBytes(long size); - long getShippedBytes(); - void clear(); - long getLastShippedAge(); -} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java similarity index 93% rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 963abba..4e8c810 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -24,8 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class MetricsReplicationGlobalSourceSourceImpl - implements MetricsReplicationGlobalSourceSource { +public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ private static final String KEY_PREFIX = "source."; private final MetricsReplicationSourceImpl rms; @@ -54,9 +53,8 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; - private final MutableGaugeLong walReaderBufferUsageBytes; - public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { + public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { this.rms = rms; ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); @@ -94,8 +92,6 @@ public class MetricsReplicationGlobalSourceSourceImpl .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); failedRecoveryQueue = rms.getMetricsRegistry() .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); - walReaderBufferUsageBytes = rms.getMetricsRegistry() - .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); } @Override public void setLastShippedAge(long age) { @@ -264,14 +260,4 @@ public class MetricsReplicationGlobalSourceSourceImpl public String getMetricsName() { return rms.getMetricsName(); } - - @Override - public void setWALReaderEditsBufferBytes(long usage) { - this.walReaderBufferUsageBytes.set(usage); - } - - @Override - public long getWALReaderEditsBufferBytes() { - return this.walReaderBufferUsageBytes.value(); - } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java index c0cd1c7..af310f0 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -35,11 +35,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id); } - @Override public MetricsReplicationTableSource getTableSource(String tableName) { - return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); - } - - @Override public MetricsReplicationGlobalSourceSource getGlobalSource() { - return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source); + @Override public MetricsReplicationSourceSource getGlobalSource() { + return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 79e9bc1..0ad5052 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou @Override public void incrShippedBytes(long size) { shippedBytesCounter.incr(size); - MetricsReplicationGlobalSourceSourceImpl + MetricsReplicationGlobalSourceSource .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java deleted file mode 100644 index 7120a73..0000000 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.metrics2.lib.MutableFastCounter; -import org.apache.hadoop.metrics2.lib.MutableHistogram; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * This is the metric source for table level replication metrics. - * We can easy monitor some useful table level replication metrics such as - * ageOfLastShippedOp and shippedBytes - */ [email protected] -public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource { - - private final MetricsReplicationSourceImpl rms; - private final String tableName; - private final String ageOfLastShippedOpKey; - private String keyPrefix; - - private final String shippedBytesKey; - - private final MutableHistogram ageOfLastShippedOpHist; - private final MutableFastCounter shippedBytesCounter; - - public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) { - this.rms = rms; - this.tableName = tableName; - this.keyPrefix = "source." + this.tableName + "."; - - ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; - ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey); - - shippedBytesKey = this.keyPrefix + "shippedBytes"; - shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); - } - - @Override - public void setLastShippedAge(long age) { - ageOfLastShippedOpHist.add(age); - } - - @Override - public void incrShippedBytes(long size) { - shippedBytesCounter.incr(size); - } - - @Override - public void clear() { - rms.removeMetric(ageOfLastShippedOpKey); - rms.removeMetric(shippedBytesKey); - } - - @Override - public long getLastShippedAge() { - return ageOfLastShippedOpHist.getMax(); - } - - @Override - public long getShippedBytes() { - return shippedBytesCounter.value(); - } - - @Override - public void init() { - rms.init(); - } - - @Override - public void setGauge(String gaugeName, long value) { - rms.setGauge(this.keyPrefix + gaugeName, value); - } - - @Override - public void incGauge(String gaugeName, long delta) { - rms.incGauge(this.keyPrefix + gaugeName, delta); - } - - @Override - public void decGauge(String gaugeName, long delta) { - rms.decGauge(this.keyPrefix + gaugeName, delta); - } - - @Override - public void removeMetric(String key) { - rms.removeMetric(this.keyPrefix + key); - } - - @Override - public void incCounters(String counterName, long delta) { - rms.incCounters(this.keyPrefix + counterName, delta); - } - - @Override - public void updateHistogram(String name, long value) { - rms.updateHistogram(this.keyPrefix + name, value); - } - - @Override - public String getMetricsContext() { - return rms.getMetricsContext(); - } - - @Override - public String getMetricsDescription() { - return rms.getMetricsDescription(); - } - - @Override - public String getMetricsJmxContext() { - return rms.getMetricsJmxContext(); - } - - @Override - public String getMetricsName() { - return rms.getMetricsName(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 49ef392..830ebe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +47,8 @@ public class MetricsSource implements BaseSource { private String id; private final MetricsReplicationSourceSource singleSourceSource; - private final MetricsReplicationGlobalSourceSource globalSourceSource; - private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; + private final MetricsReplicationSourceSource globalSourceSource; + private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable; /** * Constructor used to register the metrics @@ -74,8 +71,8 @@ public class MetricsSource implements BaseSource { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationGlobalSourceSource globalSourceSource, - Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { + MetricsReplicationSourceSource globalSourceSource, + Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; this.globalSourceSource = globalSourceSource; @@ -96,29 +93,6 @@ public class MetricsSource implements BaseSource { } /** - * Update the table level replication metrics per table - * - * @param walEntries List of pairs of WAL entry and it's size - */ - public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) { - for (Pair<Entry, Long> walEntryWithSize : walEntries) { - Entry entry = walEntryWithSize.getFirst(); - long entrySize = walEntryWithSize.getSecond(); - String tableName = entry.getKey().getTableName().getNameAsString(); - long writeTime = entry.getKey().getWriteTime(); - long age = EnvironmentEdgeManager.currentTime() - writeTime; - - // get the replication metrics source for table at the run time - MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() - .computeIfAbsent(tableName, - t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) - .getTableSource(t)); - tableSource.setLastShippedAge(age); - tableSource.incrShippedBytes(entrySize); - } - } - - /** * Set the age of the last edit that was shipped group by table * @param timestamp write time of the edit * @param tableName String as group and tableName @@ -127,7 +101,7 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; this.getSingleSourceSourceByTable().computeIfAbsent( tableName, t -> CompatibilitySingletonFactory - .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) + .getInstance(MetricsReplicationSourceFactory.class).getSource(t)) .setLastShippedAge(age); } @@ -145,7 +119,7 @@ public class MetricsSource implements BaseSource { * @param walGroup which group we are getting * @return age */ - public long getAgeOfLastShippedOp(String walGroup) { + public long getAgeofLastShippedOp(String walGroup) { return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); } @@ -419,22 +393,7 @@ public class MetricsSource implements BaseSource { } @VisibleForTesting - public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { + public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() { return singleSourceSourceByTable; } - - /** - * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. - */ - public void setWALReaderEditsBufferUsage(long usageInBytes) { - globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); - } - - /** - * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. - * @return the amount of memory in bytes used in this RegionServer by edits pending replication. - */ - public long getWALReaderEditsBufferUsage() { - return globalSourceSource.getWALReaderEditsBufferBytes(); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index bec9042..752cfb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; @@ -73,7 +72,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; - private MetricsReplicationGlobalSourceSource globalMetricsSource; private PeerProcedureHandler peerProcedureHandler; @@ -121,12 +119,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } - this.globalMetricsSource = CompatibilitySingletonFactory - .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - globalMetricsSource); + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c5f72c9..4726949 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -329,7 +329,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); - ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); + ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); @@ -706,9 +706,7 @@ public class ReplicationSource implements ReplicationSourceInterface { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); - // Record the new buffer usage - this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + totalBufferUsed.addAndGet(-batchSize); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2806110..a6fc313 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -155,9 +155,6 @@ public class ReplicationSourceManager implements ReplicationListener { private AtomicLong totalBufferUsed = new AtomicLong(); - // Total buffer size on this RegionServer for holding batched edits to be shipped. - private final long totalBufferLimit; - private final MetricsReplicationGlobalSourceSource globalMetrics; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -174,8 +171,7 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, - MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { + WALFileLengthProvider walFileLengthProvider) throws IOException { // CopyOnWriteArrayList is thread-safe. // Generally, reading is more than modifying. this.sources = new ConcurrentHashMap<>(); @@ -209,9 +205,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.latestPaths = new HashSet<Path>(); replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); - this.globalMetrics = globalMetrics; } /** @@ -869,14 +862,6 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Returns the maximum size in bytes of edits held in memory which are pending replication - * across all sources inside this RegionServer. - */ - public long getTotalBufferLimit() { - return totalBufferLimit; - } - - /** * Get the directory where wals are archived * @return the directory where wals are archived */ @@ -913,10 +898,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public String getStats() { StringBuilder stats = new StringBuilder(); - // Print stats that apply across all Replication Sources - stats.append("Global stats: "); - stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") - .append(getTotalBufferLimit()).append("B\n"); for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append(source.getStats() + "\n"); @@ -942,8 +923,4 @@ public class ReplicationSourceManager implements ReplicationListener { int activeFailoverTaskCount() { return executor.getActiveCount(); } - - MetricsReplicationGlobalSourceSource getGlobalMetrics() { - return this.globalMetrics; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 78e5521..23f736f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; @@ -200,14 +201,17 @@ public class ReplicationSourceShipper extends Thread { // Clean up hfile references for (Entry entry : entries) { cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + TableName tableName = entry.getKey().getTableName(); + source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), + tableName.getNameAsString()); } // Log and clean up WAL logs updateLogPosition(entryBatch); //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) //this sizeExcludeBulkLoad has to use same calculation that when calling - //acquireBufferQuota() in ReplicationSourceWALReader because they maintain + //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain //same variable: totalBufferUsed source.postShipEdits(entries, sizeExcludeBulkLoad); // FIXME check relationship between wal group and overall @@ -215,8 +219,6 @@ public class ReplicationSourceShipper extends Thread { entryBatch.getNbHFiles()); source.getSourceMetrics().setAgeOfLastShippedOp( entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); - source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); - if (LOG.isTraceEnabled()) { LOG.trace("Replicated {} entries or {} operations in {} ms", entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 328cb8f..12653f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -103,7 +104,8 @@ class ReplicationSourceWALReader extends Thread { // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); + this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -170,7 +172,7 @@ class ReplicationSourceWALReader extends Thread { } long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); - batch.addEntry(entry, entrySize); + batch.addEntry(entry); updateBatchStats(batch, entry, entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); @@ -270,8 +272,6 @@ class ReplicationSourceWALReader extends Thread { private boolean checkQuota() { // try not to go over total quota if (totalBufferUsed.get() > totalBufferQuota) { - LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); Threads.sleep(sleepForRetries); return false; } @@ -306,7 +306,9 @@ class ReplicationSourceWALReader extends Thread { private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); + WALKey key = entry.getKey(); + return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + + key.estimatedSerializedSizeOf(); } public static long getEntrySizeExcludeBulkLoad(Entry entry) { @@ -399,10 +401,7 @@ class ReplicationSourceWALReader extends Thread { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - long newBufferUsed = totalBufferUsed.addAndGet(size); - // Record the new buffer usage - this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - return newBufferUsed >= totalBufferQuota; + return totalBufferUsed.addAndGet(size) >= totalBufferQuota; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 4f96c96..22b2de7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -21,9 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -36,8 +34,7 @@ class WALEntryBatch { // used by recovered replication queue to indicate that all the entries have been read. public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); - private List<Pair<Entry, Long>> walEntriesWithSize; - + private List<Entry> walEntries; // last WAL that was read private Path lastWalPath; // position in WAL of last entry in this batch @@ -57,7 +54,7 @@ class WALEntryBatch { * @param lastWalPath Path of the WAL the last entry in this batch was read from */ WALEntryBatch(int maxNbEntries, Path lastWalPath) { - this.walEntriesWithSize = new ArrayList<>(maxNbEntries); + this.walEntries = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } @@ -69,22 +66,15 @@ class WALEntryBatch { return batch; } - public void addEntry(Entry entry, long entrySize) { - walEntriesWithSize.add(new Pair<>(entry, entrySize)); + public void addEntry(Entry entry) { + walEntries.add(entry); } /** * @return the WAL Entries. */ public List<Entry> getWalEntries() { - return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList()); - } - - /** - * @return the WAL Entries. - */ - public List<Pair<Entry, Long>> getWalEntriesWithSize() { - return walEntriesWithSize; + return walEntries; } /** @@ -106,7 +96,7 @@ class WALEntryBatch { } public int getNbEntries() { - return walEntriesWithSize.size(); + return walEntries.size(); } /** @@ -170,7 +160,7 @@ class WALEntryBatch { @Override public String toString() { - return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath + + return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" + endOfFile + "]"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index cc7e20c..5d7366d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,8 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -47,21 +44,16 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; -import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; -import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.junit.AfterClass; @@ -312,12 +304,11 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Test - public void testMetricsSourceBaseSourcePassThrough() { + public void testMetricsSourceBaseSourcePassthrough() { /* - * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, - * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, - * so that metrics get written to both namespaces. Both of those classes wrap a - * MetricsReplicationSourceImpl that implements BaseSource, which allows + * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a + * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of + * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on * MetricsSource actually calls down through the two layers of wrapping to the actual * BaseSource. @@ -331,15 +322,14 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); - MetricsReplicationGlobalSourceSource globalSourceSource = - new MetricsReplicationGlobalSourceSourceImpl(globalRms); - MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); + MetricsReplicationSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSource(globalRms); + MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); - Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = - new HashMap<>(); - MetricsSource source = new MetricsSource(id, singleSourceSource, - spyglobalSourceSource, singleSourceSourceByTable); + Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>(); + MetricsSource source = + new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); String gaugeName = "gauge"; @@ -388,37 +378,16 @@ public class TestReplicationEndpoint extends TestReplicationBase { boolean containsRandomNewTable = source.getSingleSourceSourceByTable() .containsKey("RandomNewTable"); Assert.assertEquals(false, containsRandomNewTable); - source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); + source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); containsRandomNewTable = source.getSingleSourceSourceByTable() .containsKey("RandomNewTable"); Assert.assertEquals(true, containsRandomNewTable); - MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable() + MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() .get("RandomNewTable"); - - // age should be greater than zero we created the entry with time in the past + // cannot put more concreate value here to verify because the age is arbitrary. + // as long as it's greater than 0, we see it as correct answer. Assert.assertTrue(msr.getLastShippedAge() > 0); - Assert.assertTrue(msr.getShippedBytes() > 0); - - } - - private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { - List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); - byte[] a = new byte[] { 'a' }; - Entry entry = createEntry(tableName, null, a); - walEntriesWithSize.add(new Pair<>(entry, 10L)); - return walEntriesWithSize; - } - private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { - WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), - System.currentTimeMillis() - 1L, - scopes); - WALEdit edit1 = new WALEdit(); - - for (byte[] kv : kvs) { - edit1.add(new KeyValue(kv, kv, kv)); - } - return new Entry(key1, edit1); } private void doPut(byte[] row) throws IOException { @@ -494,44 +463,6 @@ public class TestReplicationEndpoint extends TestReplicationBase { } } - /** - * Not used by unit tests, helpful for manual testing with replication. - * <p> - * Snippet for `hbase shell`: - * <pre> - * create 't', 'f' - * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ - * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' - * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} - * </pre> - */ - public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { - private long duration; - public SleepingReplicationEndpointForTest() { - super(); - } - - @Override - public void init(Context context) throws IOException { - super.init(context); - if (this.ctx != null) { - duration = this.ctx.getConfiguration().getLong( - "hbase.test.sleep.replication.endpoint.duration.millis", 5000L); - } - } - - @Override - public boolean replicate(ReplicateContext context) { - try { - Thread.sleep(duration); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - return super.replicate(context); - } - } - public static class InterClusterReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 067ab63..b4af38b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -336,8 +336,6 @@ public class TestWALEntryStream { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()).thenReturn( - (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); @@ -345,9 +343,6 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( - MetricsReplicationGlobalSourceSource.class); - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; }
