This is an automated email from the ASF dual-hosted git repository.
tianjy pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a8af0e5 HBASE-21505 - proposal for a more consistent report on status
a8af0e5 is described below
commit a8af0e58942314ad4fd7837a5fafae440259eace
Author: Wellington Chevreuil <[email protected]>
AuthorDate: Tue Feb 19 14:47:18 2019 +0000
HBASE-21505 - proposal for a more consistent report on status
Signed-off-by: Jingyun Tian <[email protected]>
---
.../java/org/apache/hadoop/hbase/ServerLoad.java | 9 +
.../org/apache/hadoop/hbase/ServerMetrics.java | 6 +
.../apache/hadoop/hbase/ServerMetricsBuilder.java | 15 +-
.../hbase/replication/ReplicationLoadSource.java | 142 +++++++++++++-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 23 ++-
.../MetricsReplicationSourceSource.java | 3 +
.../MetricsReplicationGlobalSourceSource.java | 21 ++-
.../MetricsReplicationSourceSourceImpl.java | 18 +-
.../src/main/protobuf/ClusterStatus.proto | 7 +
.../hadoop/hbase/regionserver/HRegionServer.java | 4 +-
.../HBaseInterClusterReplicationEndpoint.java | 12 +-
.../replication/regionserver/MetricsSource.java | 75 ++++++--
.../replication/regionserver/Replication.java | 20 +-
.../replication/regionserver/ReplicationLoad.java | 85 +++------
.../regionserver/ReplicationSource.java | 38 +++-
.../regionserver/ReplicationSourceManager.java | 17 +-
.../regionserver/ReplicationSourceShipper.java | 23 +--
.../regionserver/ReplicationSourceWALReader.java | 8 +-
.../replication/regionserver/WALEntryStream.java | 2 +
.../hbase/master/TestGetReplicationLoad.java | 48 ++++-
.../hbase/replication/TestReplicationBase.java | 27 ++-
.../hbase/replication/TestReplicationStatus.java | 203 ++++++++++++++++++++-
hbase-shell/src/main/ruby/hbase/admin.rb | 118 +++++++++---
23 files changed, 735 insertions(+), 189 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
index 7a1019a..15c8e63 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
@@ -391,6 +391,15 @@ public class ServerLoad implements ServerMetrics {
/**
* Call directly from client such as hbase shell
+ * @return a map of ReplicationLoadSource list per peer id
+ */
+ @Override
+ public Map<String, List<ReplicationLoadSource>>
getReplicationLoadSourceMap() {
+ return metrics.getReplicationLoadSourceMap();
+ }
+
+ /**
+ * Call directly from client such as hbase shell
* @return ReplicationLoadSink
*/
@Override
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
index 1e1d395..391e62f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
@@ -77,6 +77,12 @@ public interface ServerMetrics {
/**
* Call directly from client such as hbase shell
+ * @return a map of ReplicationLoadSource list per peer id
+ */
+ Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap();
+
+ /**
+ * Call directly from client such as hbase shell
* @return ReplicationLoadSink
*/
@Nullable
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
index 2a487d7..a22907b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hbase;
import edu.umd.cs.findbugs.annotations.Nullable;
+
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -75,7 +78,7 @@ public final class ServerMetricsBuilder {
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
-
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
+
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
: null)
@@ -302,6 +305,16 @@ public final class ServerMetricsBuilder {
}
@Override
+ public Map<String, List<ReplicationLoadSource>>
getReplicationLoadSourceMap(){
+ Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
+ for(ReplicationLoadSource loadSource : sources){
+ sourcesMap.computeIfAbsent(loadSource.getPeerID(),
+ peerId -> new ArrayList()).add(loadSource);
+ }
+ return sourcesMap;
+ }
+
+ @Override
public ReplicationLoadSink getReplicationLoadSink() {
return sink;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
index 9e24e22..8ee22195 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
@@ -16,21 +16,36 @@ import org.apache.yetus.audience.InterfaceAudience;
* A HBase ReplicationLoad to present MetricsSource information
*/
@InterfaceAudience.Public
-public class ReplicationLoadSource {
+public final class ReplicationLoadSource {
private final String peerID;
private final long ageOfLastShippedOp;
private final int sizeOfLogQueue;
private final long timestampOfLastShippedOp;
private final long replicationLag;
+ private long timeStampOfNextToReplicate;
+ private String queueId;
+ private boolean recovered;
+ private boolean running;
+ private boolean editsSinceRestart;
+ private long editsRead;
+ private long oPsShipped;
- // TODO: add the builder for this class
@InterfaceAudience.Private
- public ReplicationLoadSource(String id, long age, int size, long timestamp,
long lag) {
+ private ReplicationLoadSource(String id, long age, int size, long timestamp,
+ long timeStampOfNextToReplicate, long lag, String queueId, boolean
recovered, boolean running,
+ boolean editsSinceRestart, long editsRead, long oPsShipped) {
this.peerID = id;
this.ageOfLastShippedOp = age;
this.sizeOfLogQueue = size;
this.timestampOfLastShippedOp = timestamp;
this.replicationLag = lag;
+ this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
+ this.queueId = queueId;
+ this.recovered = recovered;
+ this.running = running;
+ this.editsSinceRestart = editsSinceRestart;
+ this.editsRead = editsRead;
+ this.oPsShipped = oPsShipped;
}
public String getPeerID() {
@@ -61,4 +76,123 @@ public class ReplicationLoadSource {
public long getReplicationLag() {
return this.replicationLag;
}
-}
+
+ public long getTimeStampOfNextToReplicate() {
+ return this.timeStampOfNextToReplicate;
+ }
+
+ public String getQueueId() {
+ return queueId;
+ }
+
+ public boolean isRecovered() {
+ return recovered;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public boolean hasEditsSinceRestart() {
+ return editsSinceRestart;
+ }
+
+ public long getEditsRead() {
+ return editsRead;
+ }
+
+ public long getOPsShipped() {
+ return oPsShipped;
+ }
+
+ public static ReplicationLoadSourceBuilder newBuilder(){
+ return new ReplicationLoadSourceBuilder();
+ }
+
+ public static final class ReplicationLoadSourceBuilder {
+
+ private String peerID;
+ private long ageOfLastShippedOp;
+ private int sizeOfLogQueue;
+ private long timestampOfLastShippedOp;
+ private long replicationLag;
+ private long timeStampOfNextToReplicate;
+ private String queueId;
+ private boolean recovered;
+ private boolean running;
+ private boolean editsSinceRestart;
+ private long editsRead;
+ private long oPsShipped;
+
+ private ReplicationLoadSourceBuilder(){
+
+ }
+
+ public ReplicationLoadSourceBuilder setTimeStampOfNextToReplicate(
+ long timeStampOfNextToReplicate) {
+ this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setPeerID(String peerID) {
+ this.peerID = peerID;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setAgeOfLastShippedOp(long
ageOfLastShippedOp) {
+ this.ageOfLastShippedOp = ageOfLastShippedOp;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setSizeOfLogQueue(int sizeOfLogQueue) {
+ this.sizeOfLogQueue = sizeOfLogQueue;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setTimestampOfLastShippedOp(long
timestampOfLastShippedOp) {
+ this.timestampOfLastShippedOp = timestampOfLastShippedOp;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setReplicationLag(long replicationLag)
{
+ this.replicationLag = replicationLag;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setQueueId(String queueId) {
+ this.queueId = queueId;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setRecovered(boolean recovered) {
+ this.recovered = recovered;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setRunning(boolean running) {
+ this.running = running;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setEditsSinceRestart(boolean
editsSinceRestart) {
+ this.editsSinceRestart = editsSinceRestart;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setEditsRead(long editsRead) {
+ this.editsRead = editsRead;
+ return this;
+ }
+
+ public ReplicationLoadSourceBuilder setoPsShipped(long oPsShipped) {
+ this.oPsShipped = oPsShipped;
+ return this;
+ }
+
+ public ReplicationLoadSource build(){
+ return new ReplicationLoadSource(peerID, ageOfLastShippedOp,
sizeOfLogQueue,
+ timestampOfLastShippedOp, timeStampOfNextToReplicate,
replicationLag, queueId, recovered,
+ running, editsSinceRestart, editsRead, oPsShipped);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 3039392..bc97b01 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2717,8 +2717,20 @@ public final class ProtobufUtil {
public static ReplicationLoadSource toReplicationLoadSource(
ClusterStatusProtos.ReplicationLoadSource rls) {
- return new ReplicationLoadSource(rls.getPeerID(),
rls.getAgeOfLastShippedOp(),
- rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(),
rls.getReplicationLag());
+ ReplicationLoadSource.ReplicationLoadSourceBuilder builder =
ReplicationLoadSource.newBuilder();
+ builder.setPeerID(rls.getPeerID()).
+ setAgeOfLastShippedOp(rls.getAgeOfLastShippedOp()).
+ setSizeOfLogQueue(rls.getSizeOfLogQueue()).
+ setTimestampOfLastShippedOp(rls.getTimeStampOfLastShippedOp()).
+ setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()).
+ setReplicationLag(rls.getReplicationLag()).
+ setQueueId(rls.getQueueId()).
+ setRecovered(rls.getRecovered()).
+ setRunning(rls.getRunning()).
+ setEditsSinceRestart(rls.getEditsSinceRestart()).
+ setEditsRead(rls.getEditsRead()).
+ setoPsShipped(rls.getOPsShipped());
+ return builder.build();
}
/**
@@ -3227,6 +3239,13 @@ public final class ProtobufUtil {
.setSizeOfLogQueue((int) rls.getSizeOfLogQueue())
.setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp())
.setReplicationLag(rls.getReplicationLag())
+ .setQueueId(rls.getQueueId())
+ .setRecovered(rls.isRecovered())
+ .setRunning(rls.isRunning())
+ .setEditsSinceRestart(rls.hasEditsSinceRestart())
+ .setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate())
+ .setOPsShipped(rls.getOPsShipped())
+ .setEditsRead(rls.getEditsRead())
.build();
}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 1a17f37..3fd5ac6 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -76,4 +76,7 @@ public interface MetricsReplicationSourceSource extends
BaseSource {
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
void incrFailedRecoveryQueue();
+ long getWALEditsRead();
+ long getShippedOps();
+ long getEditsFiltered();
}
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 4e8c810..8942182 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,7 +32,7 @@ public class MetricsReplicationGlobalSourceSource implements
MetricsReplicationS
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter;
- private final MutableFastCounter logEditsFilteredCounter;
+ private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
@@ -73,7 +73,7 @@ public class MetricsReplicationGlobalSourceSource implements
MetricsReplicationS
logReadInEditsCounter =
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
- logEditsFilteredCounter =
rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+ walEditsFilteredCounter =
rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
shippedHFilesCounter =
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
@@ -111,7 +111,7 @@ public class MetricsReplicationGlobalSourceSource
implements MetricsReplicationS
}
@Override public void incrLogEditsFiltered(long size) {
- logEditsFilteredCounter.incr(size);
+ walEditsFilteredCounter.incr(size);
}
@Override public void incrBatchesShipped(int batches) {
@@ -260,4 +260,19 @@ public class MetricsReplicationGlobalSourceSource
implements MetricsReplicationS
public String getMetricsName() {
return rms.getMetricsName();
}
+
+ @Override
+ public long getWALEditsRead() {
+ return this.logReadInEditsCounter.value();
+ }
+
+ @Override
+ public long getShippedOps() {
+ return this.shippedOpsCounter.value();
+ }
+
+ @Override
+ public long getEditsFiltered() {
+ return this.walEditsFilteredCounter.value();
+ }
}
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 0ad5052..ec9271e 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -48,7 +48,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter;
- private final MutableFastCounter logEditsFilteredCounter;
+ private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter;
@@ -102,7 +102,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
logReadInEditsCounter =
rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
- logEditsFilteredCounter =
rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
+ walEditsFilteredCounter =
rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter =
rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
@@ -149,7 +149,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
}
@Override public void incrLogEditsFiltered(long size) {
- logEditsFilteredCounter.incr(size);
+ walEditsFilteredCounter.incr(size);
}
@Override public void incrBatchesShipped(int batches) {
@@ -314,4 +314,16 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
public String getMetricsName() {
return rms.getMetricsName();
}
+
+ @Override public long getWALEditsRead() {
+ return this.logReadInEditsCounter.value();
+ }
+
+ @Override public long getShippedOps() {
+ return this.shippedOpsCounter.value();
+ }
+
+ @Override public long getEditsFiltered() {
+ return this.walEditsFilteredCounter.value();
+ }
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index c3fe19d..563db9f 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -158,6 +158,13 @@ message ReplicationLoadSource {
required uint32 sizeOfLogQueue = 3;
required uint64 timeStampOfLastShippedOp = 4;
required uint64 replicationLag = 5;
+ optional uint64 timeStampOfNextToReplicate=6;
+ optional string queueId = 7;
+ optional bool recovered = 8;
+ optional bool running = 9;
+ optional bool editsSinceRestart = 10;
+ optional uint64 editsRead = 11;
+ optional uint64 oPsShipped = 12;
}
message ServerLoad {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6ef6021..1e871b7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1372,9 +1372,11 @@ public class HRegionServer extends HasThread implements
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
- for (ClusterStatusProtos.ReplicationLoadSource rLS :
rLoad.getReplicationLoadSourceList()) {
+ for (ClusterStatusProtos.ReplicationLoadSource rLS :
+ rLoad.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}
+
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..57301fc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -344,7 +344,6 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
@Override
public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new
ExecutorCompletionService<>(this.exec);
- String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) {
@@ -371,19 +370,10 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
reconnectToPeerCluster();
}
try {
- long lastWriteTime;
-
// replicate the batches to sink side.
- lastWriteTime = parallelReplicate(pool, replicateContext, batches);
-
- // update metrics
- if (lastWriteTime > 0) {
- this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
- }
+ parallelReplicate(pool, replicateContext, batches);
return true;
} catch (IOException ioe) {
- // Didn't ship anything, but must still age the last time we did
- this.metrics.refreshAgeOfLastShippedOp(walGroupId);
if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster:
", ioe);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 830ebe1..92ab070 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -41,10 +41,11 @@ public class MetricsSource implements BaseSource {
private static final Logger LOG =
LoggerFactory.getLogger(MetricsSource.class);
// tracks last shipped timestamp for each wal group
- private Map<String, Long> lastTimestamps = new HashMap<>();
+ private Map<String, Long> lastShippedTimeStamps = new HashMap<String,
Long>();
private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0;
private String id;
+ private long timeStampNextToReplicate;
private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
@@ -81,7 +82,7 @@ public class MetricsSource implements BaseSource {
/**
* Set the age of the last edit that was shipped
- * @param timestamp write time of the edit
+ * @param timestamp target write time of the edit
* @param walGroup which group we are setting
*/
public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
@@ -89,7 +90,7 @@ public class MetricsSource implements BaseSource {
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
this.ageOfLastShippedOp.put(walGroup, age);
- this.lastTimestamps.put(walGroup, timestamp);
+ this.lastShippedTimeStamps.put(walGroup, timestamp);
}
/**
@@ -106,15 +107,6 @@ public class MetricsSource implements BaseSource {
}
/**
- * get the last timestamp of given wal group. If the walGroup is null,
return 0.
- * @param walGroup which group we are getting
- * @return timeStamp
- */
- public long getLastTimeStampOfWalGroup(String walGroup) {
- return this.lastTimestamps.get(walGroup) == null ? 0 :
lastTimestamps.get(walGroup);
- }
-
- /**
* get age of last shipped op of given wal group. If the walGroup is null,
return 0
* @param walGroup which group we are getting
* @return age
@@ -129,9 +121,9 @@ public class MetricsSource implements BaseSource {
* @param walGroupId id of the group to update
*/
public void refreshAgeOfLastShippedOp(String walGroupId) {
- Long lastTimestamp = this.lastTimestamps.get(walGroupId);
+ Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
if (lastTimestamp == null) {
- this.lastTimestamps.put(walGroupId, 0L);
+ this.lastShippedTimeStamps.put(walGroupId, 0L);
lastTimestamp = 0L;
}
if (lastTimestamp > 0) {
@@ -199,6 +191,30 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Gets the number of edits not eligible for replication this source queue
logs so far.
+ * @return logEditsFiltered non-replicable edits filtered from this queue
logs.
+ */
+ public long getEditsFiltered(){
+ return this.singleSourceSource.getEditsFiltered();
+ }
+
+ /**
+ * Gets the number of edits eligible for replication read from this source
queue logs so far.
+ * @return replicableEdits total number of replicable edits read from this
queue logs.
+ */
+ public long getReplicableEdits(){
+ return this.singleSourceSource.getWALEditsRead() -
this.singleSourceSource.getEditsFiltered();
+ }
+
+ /**
+ * Gets the number of OPs shipped by this source queue to target cluster.
+ * @return oPsShipped total number of OPs shipped by this source.
+ */
+ public long getOpsShipped() {
+ return this.singleSourceSource.getShippedOps();
+ }
+
+ /**
* Convience method to apply changes to metrics do to shipping a batch of
logs.
*
* @param batchSize the size of the batch that was shipped to sinks.
@@ -223,8 +239,9 @@ public class MetricsSource implements BaseSource {
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
- lastTimestamps.clear();
+ lastShippedTimeStamps.clear();
lastHFileRefsQueueSize = 0;
+ timeStampNextToReplicate = 0;
}
/**
@@ -260,7 +277,7 @@ public class MetricsSource implements BaseSource {
*/
public long getTimestampOfLastShippedOp() {
long lastTimestamp = 0L;
- for (long ts : lastTimestamps.values()) {
+ for (long ts : lastShippedTimeStamps.values()) {
if (ts > lastTimestamp) {
lastTimestamp = ts;
}
@@ -269,6 +286,32 @@ public class MetricsSource implements BaseSource {
}
/**
+ * TimeStamp of next edit to be replicated.
+ * @return timeStampNextToReplicate - TimeStamp of next edit to be
replicated.
+ */
+ public long getTimeStampNextToReplicate() {
+ return timeStampNextToReplicate;
+ }
+
+ /**
+ * TimeStamp of next edit targeted for replication. Used for calculating lag,
+ * as if this timestamp is greater than timestamp of last shipped, it means
there's
+ * at least one edit pending replication.
+ * @param timeStampNextToReplicate timestamp of next edit in the queue that
should be replicated.
+ */
+ public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
+ this.timeStampNextToReplicate = timeStampNextToReplicate;
+ }
+
+ public long getReplicationDelay() {
+ if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
+ return 0;
+ }else{
+ return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
+ }
+ }
+
+ /**
* Get the slave peer ID
* @return peerID
*/
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 6c46a85..7a25c64 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -249,24 +249,12 @@ public class Replication implements
ReplicationSourceService, ReplicationSinkSer
}
private void buildReplicationLoad() {
- List<MetricsSource> sourceMetricsList = new ArrayList<>();
-
- // get source
- List<ReplicationSourceInterface> sources =
this.replicationManager.getSources();
- for (ReplicationSourceInterface source : sources) {
- sourceMetricsList.add(source.getSourceMetrics());
- }
-
- // get old source
- List<ReplicationSourceInterface> oldSources =
this.replicationManager.getOldSources();
- for (ReplicationSourceInterface source : oldSources) {
- if (source instanceof ReplicationSource) {
- sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
- }
- }
+ List<ReplicationSourceInterface> allSources = new ArrayList<>();
+ allSources.addAll(this.replicationManager.getSources());
+ allSources.addAll(this.replicationManager.getOldSources());
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
- this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
+ this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 53e560b..fe4086b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -19,15 +19,14 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
-import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Strings;
/**
* This class is used for exporting some of the info from replication metrics
@@ -37,11 +36,9 @@ public class ReplicationLoad {
// Empty load instance.
public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new
ReplicationLoad();
-
- private List<MetricsSource> sourceMetricsList;
private MetricsSink sinkMetrics;
- private List<ClusterStatusProtos.ReplicationLoadSource>
replicationLoadSourceList;
+ private List<ClusterStatusProtos.ReplicationLoadSource>
replicationLoadSourceEntries;
private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
/** default constructor */
@@ -51,13 +48,12 @@ public class ReplicationLoad {
/**
* buildReplicationLoad
- * @param srMetricsList
+ * @param sources List of ReplicationSource instances for which metrics
should be reported
* @param skMetrics
*/
- public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
+ public void buildReplicationLoad(final List<ReplicationSourceInterface>
sources,
final MetricsSink skMetrics) {
- this.sourceMetricsList = srMetricsList;
this.sinkMetrics = skMetrics;
// build the SinkLoad
@@ -67,10 +63,9 @@ public class ReplicationLoad {
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
this.replicationLoadSink = rLoadSinkBuild.build();
- // build the SourceLoad List
- Map<String, ClusterStatusProtos.ReplicationLoadSource>
replicationLoadSourceMap =
- new HashMap<>();
- for (MetricsSource sm : this.sourceMetricsList) {
+ this.replicationLoadSourceEntries = new ArrayList<>();
+ for (ReplicationSourceInterface source : sources) {
+ MetricsSource sm = source.getSourceMetrics();
// Get the actual peer id
String peerId = sm.getPeerID();
String[] parts = peerId.split("-", 2);
@@ -78,18 +73,11 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
+ long editsRead = sm.getReplicableEdits();
+ long oPsShipped = sm.getOpsShipped();
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
- long replicationLag =
- calculateReplicationDelay(ageOfLastShippedOp,
timeStampOfLastShippedOp, sizeOfLogQueue);
-
- ClusterStatusProtos.ReplicationLoadSource rLoadSource =
replicationLoadSourceMap.get(peerId);
- if (rLoadSource != null) {
- ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(),
ageOfLastShippedOp);
- sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
- timeStampOfLastShippedOp =
Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
- timeStampOfLastShippedOp);
- replicationLag = Math.max(rLoadSource.getReplicationLag(),
replicationLag);
- }
+ long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
+ long replicationLag = sm.getReplicationDelay();
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder();
rLoadSourceBuild.setPeerID(peerId);
@@ -97,33 +85,19 @@ public class ReplicationLoad {
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
rLoadSourceBuild.setReplicationLag(replicationLag);
+
rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
+ rLoadSourceBuild.setEditsRead(editsRead);
+ rLoadSourceBuild.setOPsShipped(oPsShipped);
+ if (source instanceof ReplicationSource){
+ ReplicationSource replSource = (ReplicationSource)source;
+
rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered());
+
rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId());
+ rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
+ rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0);
+ }
- replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
- }
- this.replicationLoadSourceList = new
ArrayList<>(replicationLoadSourceMap.values());
- }
-
- static long calculateReplicationDelay(long ageOfLastShippedOp,
- long timeStampOfLastShippedOp, int sizeOfLogQueue) {
- long replicationLag;
- long timePassedAfterLastShippedOp;
- if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to
Long.MAX_VALUE
- return Long.MAX_VALUE;
- } else {
- timePassedAfterLastShippedOp =
- EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
- }
- if (sizeOfLogQueue > 1) {
- // err on the large side
- replicationLag = Math.max(ageOfLastShippedOp,
timePassedAfterLastShippedOp);
- } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
- replicationLag = ageOfLastShippedOp; // last shipped happen recently
- } else {
- // last shipped may happen last night,
- // so NO real lag although ageOfLastShippedOp is non-zero
- replicationLag = 0;
+ this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
}
- return replicationLag;
}
/**
@@ -131,18 +105,17 @@ public class ReplicationLoad {
* @return a string contains sourceReplicationLoad information
*/
public String sourceToString() {
- if (this.sourceMetricsList == null) return null;
-
StringBuilder sb = new StringBuilder();
- for (ClusterStatusProtos.ReplicationLoadSource rls :
this.replicationLoadSourceList) {
+ for (ClusterStatusProtos.ReplicationLoadSource rls :
+ this.replicationLoadSourceEntries) {
sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID());
sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp",
rls.getAgeOfLastShippedOp());
sb = Strings.appendKeyValue(sb, "SizeOfLogQueue",
rls.getSizeOfLogQueue());
sb =
Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
- (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
+ (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
sb = Strings.appendKeyValue(sb, "Replication Lag",
rls.getReplicationLag());
}
@@ -171,8 +144,8 @@ public class ReplicationLoad {
return this.replicationLoadSink;
}
- public List<ClusterStatusProtos.ReplicationLoadSource>
getReplicationLoadSourceList() {
- return this.replicationLoadSourceList;
+ public List<ClusterStatusProtos.ReplicationLoadSource>
getReplicationLoadSourceEntries() {
+ return this.replicationLoadSourceEntries;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8dd8568..894ebed 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -181,8 +181,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
this.throttler = new ReplicationThrottler((double) currentBandwidth /
10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
- LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
- + ", currentBandwidth=" + this.currentBandwidth);
+ LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+ replicationPeer.getId(), this.currentBandwidth);
}
private void decorateConf() {
@@ -207,6 +207,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
}
}
queue.put(log);
+ LOG.trace("Added log file {} to queue of source {}.", logPrefix,
+ this.replicationQueueInfo.getQueueId());
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the
warn threshold
int queueSize = queue.size();
@@ -315,15 +317,13 @@ public class ReplicationSource implements
ReplicationSourceInterface {
@Override
public Map<String, ReplicationStatus> getWalGroupStatus() {
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
- long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+ long ageOfLastShippedOp, replicationDelay, fileSize;
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper :
workerThreads.entrySet()) {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
- lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
- replicationDelay =
- ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp,
lastTimeStamp, queueSize);
+ replicationDelay = metrics.getReplicationDelay();
Path currentPath = shipper.getCurrentPath();
fileSize = -1;
if (currentPath != null) {
@@ -486,6 +486,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
for (;;) {
peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && peerClusterId == null) {
+ LOG.debug("Could not connect to Peer ZK. Sleeping for "
+ + (this.sleepForRetries * sleepMultiplier) + " millis.");
if (sleepForRetries("Cannot contact the peer's zk ensemble",
sleepMultiplier)) {
sleepMultiplier++;
}
@@ -503,7 +505,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
this.manager.removeSource(this);
return;
}
- LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+ LOG.info("Source: {}, is now replicating from cluster: {}; to peer
cluster: {};",
+ this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// start workers
@@ -552,7 +555,9 @@ public class ReplicationSource implements
ReplicationSourceInterface {
Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- worker.entryReader.setReaderRunning(false);
+ if(worker.entryReader != null) {
+ worker.entryReader.setReaderRunning(false);
+ }
}
for (ReplicationSourceShipper worker : workers) {
@@ -622,6 +627,10 @@ public class ReplicationSource implements
ReplicationSourceInterface {
return !this.server.isStopped() && this.sourceRunning;
}
+ public UUID getPeerClusterUUID(){
+ return this.clusterId;
+ }
+
/**
* Comparator used to compare logs together based on their start time
*/
@@ -644,6 +653,19 @@ public class ReplicationSource implements
ReplicationSourceInterface {
}
}
+ public ReplicationQueueInfo getReplicationQueueInfo() {
+ return replicationQueueInfo;
+ }
+
+ public boolean isWorkerRunning(){
+ for(ReplicationSourceShipper worker : this.workerThreads.values()){
+ if(worker.isActive()){
+ return worker.isActive();
+ }
+ }
+ return false;
+ }
+
@Override
public String getStats() {
StringBuilder sb = new StringBuilder();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5d4f034..36cfdd7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -383,7 +383,13 @@ public class ReplicationSourceManager implements
ReplicationListener {
toRemove.terminate(terminateMessage);
}
for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
+ walsByGroup.forEach(wal -> {
+ Path walPath = new Path(this.logDir, wal);
+ src.enqueueLog(walPath);
+ LOG.trace("Enqueued {} to source {} during source creation.",
+ walPath, src.getQueueId());
+ });
+
}
}
LOG.info("Startup replication source for " + src.getPeerId());
@@ -403,8 +409,13 @@ public class ReplicationSourceManager implements
ReplicationListener {
for (String queueId : previousQueueIds) {
ReplicationSourceInterface replicationSource = createSource(queueId,
peer);
this.oldsources.add(replicationSource);
+ LOG.trace("Added source for recovered queue: " + src.getQueueId());
for (SortedSet<String> walsByGroup :
walsByIdRecoveredQueues.get(queueId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+ walsByGroup.forEach(wal -> {
+ LOG.trace("Enqueueing log from recovered queue for source: {}",
+ src.getQueueId());
+ src.enqueueLog(new Path(wal));
+ });
}
toStartup.add(replicationSource);
}
@@ -613,6 +624,8 @@ public class ReplicationSourceManager implements
ReplicationListener {
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog);
+ LOG.trace("Enqueued {} to source {} while performing postLogRoll
operation.",
+ newLog, source.getQueueId());
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index edd2c4b..5fee659 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -90,6 +89,7 @@ public class ReplicationSourceShipper extends Thread {
@Override
public final void run() {
setWorkerState(WorkerState.RUNNING);
+ LOG.info("Running ReplicationSourceShipper Thread for wal group: {}",
this.walGroupId);
// Loop until we close down
while (isActive()) {
// Sleep until replication is enabled again
@@ -101,10 +101,9 @@ public class ReplicationSourceShipper extends Thread {
}
try {
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
+ LOG.debug("Shipper from source {} got entry batch from reader: {}",
+ source.getQueueId(), entryBatch);
if (entryBatch == null) {
- // since there is no logs need to replicate, we refresh the
ageOfLastShippedOp
-
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
- walGroupId);
continue;
}
// the NO_MORE_DATA instance has no path so do not call shipEdits
@@ -153,16 +152,13 @@ public class ReplicationSourceShipper extends Thread {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- if (updateLogPosition(entryBatch)) {
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
-
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
- walGroupId);
- }
+ updateLogPosition(entryBatch);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
+
source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size()
- 1)
+ .getKey().getWriteTime());
while (isActive()) {
try {
try {
@@ -174,7 +170,6 @@ public class ReplicationSourceShipper extends Thread {
// directly go back to while() for confirm this
continue;
}
-
// create replicateContext here, so the entries can be GC'd upon
return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
@@ -195,7 +190,7 @@ public class ReplicationSourceShipper extends Thread {
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
-
+ LOG.trace("shipped entry {}: ", entry);
TableName tableName = entry.getKey().getTableName();
source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
tableName.getNameAsString());
@@ -214,7 +209,7 @@ public class ReplicationSourceShipper extends Thread {
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
if (LOG.isTraceEnabled()) {
- LOG.trace("Replicated {} entries or {} operations in {} ms",
+ LOG.debug("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs -
startTimeNs) / 1000000);
}
break;
@@ -299,7 +294,7 @@ public class ReplicationSourceShipper extends Thread {
return 0;
}
- private boolean isActive() {
+ protected boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING &&
!isInterrupted();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 4483d7a..1552c47 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -140,7 +140,7 @@ class ReplicationSourceWALReader extends Thread {
if (batch != null) {
// need to propagate the batch even it has no entries since it may
carry the last
// sequence id information for serial replication.
- LOG.trace("Read {} WAL entries eligible for replication",
batch.getNbEntries());
+ LOG.debug("Read {} WAL entries eligible for replication",
batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
@@ -168,8 +168,11 @@ class ReplicationSourceWALReader extends Thread {
protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
WALEdit edit = entry.getEdit();
if (edit == null || edit.isEmpty()) {
+ LOG.debug("Edit null or empty for entry {} ", entry);
return false;
}
+ LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for
source queue: {}",
+ entry.getKey().getWriteTime(), entry.getKey(),
this.source.getQueueId());
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry);
@@ -284,7 +287,8 @@ class ReplicationSourceWALReader extends Thread {
protected final Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
- if (entry != null && filtered == null) {
+ if (entry != null && (filtered == null || filtered.getEdit().size() == 0))
{
+ LOG.debug("Filtered entry for replication: {}", entry);
source.getSourceMetrics().incrLogEditsFiltered();
}
return filtered;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index b2c199e..93f76f8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -173,6 +173,7 @@ class WALEntryStream implements Closeable {
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
boolean beingWritten = readNextEntryAndRecordReaderPosition();
+ LOG.trace("reading wal file {}. Current open for write: {}",
this.currentPath, beingWritten);
if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed,
i.e, rolled
// Before dequeueing, we should always get one more attempt at reading.
@@ -272,6 +273,7 @@ class WALEntryStream implements Closeable {
return true;
}
if (readEntry != null) {
+ LOG.trace("reading entry: {} ", readEntry);
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
index bf72383..ac9ae39 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
@@ -86,22 +86,52 @@ public class TestGetReplicationLoad {
@Test
public void testGetReplicationMetrics() throws Exception {
- String peer1 = "test1", peer2 = "test2";
- long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp
= 4;
- int sizeOfLogQueue = 5;
+ String peer1 = "test1", peer2 = "test2", queueId="1";
+ long ageOfLastShippedOp = 2,
+ replicationLag = 3,
+ timeStampOfLastShippedOp = 4,
+ timeStampOfNextToReplicate=5,
+ editsRead=6,
+ oPsShipped=7;
+ int sizeOfLogQueue = 8;
+ boolean recovered=false,
+ running=false,
+ editsSinceRestart=false;
RegionServerStatusProtos.RegionServerReportRequest.Builder request =
RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
ServerName serverName = cluster.getMaster(0).getServerName();
request.setServer(ProtobufUtil.toServerName(serverName));
ClusterStatusProtos.ReplicationLoadSource rload1 =
ClusterStatusProtos.ReplicationLoadSource
-
.newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
-
.setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
- .setSizeOfLogQueue(sizeOfLogQueue).build();
+ .newBuilder().setPeerID(peer1)
+ .setAgeOfLastShippedOp(ageOfLastShippedOp)
+ .setReplicationLag(replicationLag)
+ .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
+ .setSizeOfLogQueue(sizeOfLogQueue)
+ .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate)
+ .setQueueId(queueId)
+ .setEditsRead(editsRead)
+ .setOPsShipped(oPsShipped)
+ .setRunning(running)
+ .setRecovered(recovered)
+ .setEditsSinceRestart(editsSinceRestart)
+ .build();
ClusterStatusProtos.ReplicationLoadSource rload2 =
- ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
- .setAgeOfLastShippedOp(ageOfLastShippedOp +
1).setReplicationLag(replicationLag + 1)
+ ClusterStatusProtos.ReplicationLoadSource
+ .newBuilder()
+ .setPeerID(peer2)
+ .setAgeOfLastShippedOp(ageOfLastShippedOp + 1)
+ .setReplicationLag(replicationLag + 1)
.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
- .setSizeOfLogQueue(sizeOfLogQueue + 1).build();
+ .setSizeOfLogQueue(sizeOfLogQueue + 1)
+ .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate+1)
+ .setQueueId(queueId)
+ .setEditsRead(editsRead+1)
+ .setOPsShipped(oPsShipped+1)
+ .setRunning(running)
+ .setRecovered(recovered)
+ .setEditsSinceRestart(editsSinceRestart)
+ .build();
+
ClusterStatusProtos.ServerLoad sl =
ClusterStatusProtos.ServerLoad.newBuilder()
.addReplLoadSource(rload1).addReplLoadSource(rload2).build();
request.setLoad(sl);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index f96dbe5..11a9455 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -165,8 +165,7 @@ public class TestReplicationBase {
htable1.put(puts);
}
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
+ protected static void configureClusters(){
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// We don't want too many edits per batch sent to the ReplicationEndpoint
to trigger
// sufficient number of events. But we don't want to go too low because
@@ -188,6 +187,17 @@ public class TestReplicationBase {
conf1.setLong("hbase.serial.replication.waiting.ms", 100);
utility1 = new HBaseTestingUtility(conf1);
+
+ // Base conf2 on conf1 so it gets the right zk cluster.
+ conf2 = HBaseConfiguration.create(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+ utility2 = new HBaseTestingUtility(conf2);
+ }
+
+ protected static void startClusters() throws Exception{
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
// Have to reget conf1 in case zk cluster location different
@@ -197,13 +207,6 @@ public class TestReplicationBase {
admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk");
- // Base conf2 on conf1 so it gets the right zk cluster.
- conf2 = HBaseConfiguration.create(conf1);
- conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
- conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
- conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-
- utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
LOG.info("Setup second Zk");
@@ -238,6 +241,12 @@ public class TestReplicationBase {
htable2 = connection2.getTable(tableName);
}
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ configureClusters();
+ startClusters();
+ }
+
private boolean peerExist(String peerId) throws IOException {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p ->
peerId.equals(p.getPeerId()));
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index aaa843e..c778f52 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.EnumSet;
@@ -33,6 +34,10 @@ import
org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -60,7 +65,8 @@ public class TestReplicationStatus extends
TestReplicationBase {
@Test
public void testReplicationStatus() throws Exception {
LOG.info("testReplicationStatus");
-
+ utility2.shutdownMiniHBaseCluster();
+ utility2.startMiniHBaseCluster(1,4);
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
// disable peer
admin.disablePeer(PEER_ID);
@@ -103,11 +109,204 @@ public class TestReplicationStatus extends
TestReplicationBase {
ServerLoad sl = status.getLoad(server);
List<ReplicationLoadSource> rLoadSourceList =
sl.getReplicationLoadSourceList();
// check SourceList still only has one entry
- assertTrue("failed to get ReplicationLoadSourceList",
(rLoadSourceList.size() == 1));
+ assertTrue("failed to get ReplicationLoadSourceList",
(rLoadSourceList.size() == 2));
assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
} finally {
admin.enablePeer(PEER_ID);
utility1.getHBaseCluster().getRegionServer(1).start();
}
}
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ //we need to perform initialisations from
TestReplicationBase.setUpBeforeClass() on each
+ //test here, so we override BeforeClass to do nothing and call
+ // TestReplicationBase.setUpBeforeClass() from setup method
+ TestReplicationBase.configureClusters();
+ }
+
+ @Before
+ @Override
+ public void setUpBase() throws Exception {
+ TestReplicationBase.startClusters();
+ super.setUpBase();
+ }
+
+ @After
+ @Override
+ public void tearDownBase() throws Exception {
+ utility2.shutdownMiniCluster();
+ utility1.shutdownMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass(){
+ //We need to override it here to avoid issues when trying to execute super
class teardown
+ }
+
+ @Test
+ public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws
Exception {
+ utility2.shutdownMiniHBaseCluster();
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ Admin hbaseAdmin = utility1.getConnection().getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().
+ getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ ClusterStatus status = new ClusterStatus(hbaseAdmin.
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
+ List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
+ get(serverName).getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertFalse(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertEquals(0, loadSource.getReplicationLag());
+ assertFalse(loadSource.isRecovered());
+ }
+
+ @Test
+ public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws
Exception {
+ utility2.shutdownMiniHBaseCluster();
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ Admin hbaseAdmin = utility1.getConnection().getAdmin();
+ //add some values to source cluster
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ ServerName serverName = utility1.getHBaseCluster().
+ getRegionServer(0).getServerName();
+ ClusterStatus status = new ClusterStatus(hbaseAdmin.
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
+ List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
+ get(serverName).getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag()>0);
+ assertFalse(loadSource.isRecovered());
+ }
+
+ @Test
+ public void testReplicationStatusSourceStartedTargetStoppedWithRecovery()
throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ //add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ Admin hbaseAdmin = utility1.getConnection().getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().
+ getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ ClusterStatus status = new ClusterStatus(hbaseAdmin.
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
+ List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
+ get(serverName).getReplicationLoadSourceList();
+ assertEquals(2, loadSources.size());
+ boolean foundRecovery = false;
+ boolean foundNormal = false;
+ for(ReplicationLoadSource loadSource : loadSources){
+ if (loadSource.isRecovered()){
+ foundRecovery = true;
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag()>0);
+ } else {
+ foundNormal = true;
+ assertFalse(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertEquals(0, loadSource.getReplicationLag());
+ }
+ }
+ assertTrue("No normal queue found.", foundNormal);
+ assertTrue("No recovery queue found.", foundRecovery);
+ }
+
+ @Test
+ public void testReplicationStatusBothNormalAndRecoveryLagging() throws
Exception {
+ utility2.shutdownMiniHBaseCluster();
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ //add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ Admin hbaseAdmin = utility1.getConnection().getAdmin();
+ ServerName serverName = utility1.getHBaseCluster().
+ getRegionServer(0).getServerName();
+ Thread.sleep(10000);
+ //add more values to cluster 1, these should cause normal queue to lag
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ Thread.sleep(10000);
+ ClusterStatus status = new ClusterStatus(hbaseAdmin.
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
+ List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
+ get(serverName).getReplicationLoadSourceList();
+ assertEquals(2, loadSources.size());
+ boolean foundRecovery = false;
+ boolean foundNormal = false;
+ for(ReplicationLoadSource loadSource : loadSources){
+ if (loadSource.isRecovered()){
+ foundRecovery = true;
+ } else {
+ foundNormal = true;
+ }
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+ assertTrue(loadSource.getReplicationLag()>0);
+ }
+ assertTrue("No normal queue found.", foundNormal);
+ assertTrue("No recovery queue found.", foundRecovery);
+ }
+
+ @Test
+ public void testReplicationStatusAfterLagging() throws Exception {
+ utility2.shutdownMiniHBaseCluster();
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster();
+ //add some values to cluster 1
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ utility2.startMiniHBaseCluster();
+ Thread.sleep(10000);
+ try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+ ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).
+ getServerName();
+ ClusterStatus status =
+ new
ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
+ List<ReplicationLoadSource> loadSources =
status.getLiveServerMetrics().get(serverName).
+ getReplicationLoadSourceList();
+ assertEquals(1, loadSources.size());
+ ReplicationLoadSource loadSource = loadSources.get(0);
+ assertTrue(loadSource.hasEditsSinceRestart());
+ assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
+ assertEquals(0, loadSource.getReplicationLag());
+ }finally{
+ utility2.shutdownMiniHBaseCluster();
+ }
+ }
}
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb
b/hbase-shell/src/main/ruby/hbase/admin.rb
index f716de9..6b0e7c0 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -785,37 +785,30 @@ module Hbase
puts(format(' %s', server))
end
elsif format == 'replication'
- puts(format('version %s', status.getHBaseVersion))
- puts(format('%d live servers', status.getServersSize))
- for server in status.getServers
- sl = status.getLoad(server)
- rSinkString = ' SINK :'
- rSourceString = ' SOURCE:'
- rLoadSink = sl.getReplicationLoadSink
- next if rLoadSink.nil?
- rSinkString << ' AgeOfLastAppliedOp=' +
rLoadSink.getAgeOfLastAppliedOp.to_s
- rSinkString << ', TimeStampsOfLastAppliedOp=' +
-
java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString
- rLoadSourceList = sl.getReplicationLoadSourceList
- index = 0
- while index < rLoadSourceList.size
- rLoadSource = rLoadSourceList.get(index)
- rSourceString << ' PeerID=' + rLoadSource.getPeerID
- rSourceString << ', AgeOfLastShippedOp=' +
rLoadSource.getAgeOfLastShippedOp.to_s
- rSourceString << ', SizeOfLogQueue=' +
rLoadSource.getSizeOfLogQueue.to_s
- rSourceString << ', TimeStampsOfLastShippedOp=' +
-
java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString
- rSourceString << ', Replication Lag=' +
rLoadSource.getReplicationLag.to_s
- index += 1
- end
- puts(format(' %s:', server.getHostname))
- if type.casecmp('SOURCE') == 0
- puts(format('%s', rSourceString))
- elsif type.casecmp('SINK') == 0
- puts(format('%s', rSinkString))
+ puts(format('version %<version>s', version: status.getHBaseVersion))
+ puts(format('%<servers>d live servers', servers:
status.getServersSize))
+ status.getServers.each do |server_status|
+ sl = status.getLoad(server_status)
+ r_sink_string = ' SINK:'
+ r_source_string = ' SOURCE:'
+ r_load_sink = sl.getReplicationLoadSink
+ next if r_load_sink.nil?
+
+ r_sink_string << ' AgeOfLastAppliedOp=' +
+ r_load_sink.getAgeOfLastAppliedOp.to_s
+ r_sink_string << ', TimeStampsOfLastAppliedOp=' +
+ java.util.Date.new(r_load_sink
+ .getTimeStampsOfLastAppliedOp).toString
+ r_load_source_map = sl.getReplicationLoadSourceMap
+ build_source_string(r_load_source_map, r_source_string)
+ puts(format(' %<host>s:', host: server_status.getHostname))
+ if type.casecmp('SOURCE').zero?
+ puts(format('%<source>s', source: r_source_string))
+ elsif type.casecmp('SINK').zero?
+ puts(format('%<sink>s', sink: r_sink_string))
else
- puts(format('%s', rSourceString))
- puts(format('%s', rSinkString))
+ puts(format('%<source>s', source: r_source_string))
+ puts(format('%<sink>s', sink: r_sink_string))
end
end
elsif format == 'simple'
@@ -844,6 +837,71 @@ module Hbase
end
end
+ def build_source_string(r_load_source_map, r_source_string)
+ r_load_source_map.each do |peer, sources|
+ r_source_string << ' PeerID=' + peer
+ sources.each do |source_load|
+ build_queue_title(source_load, r_source_string)
+ build_running_source_stats(source_load, r_source_string)
+ end
+ end
+ end
+
+ def build_queue_title(source_load, r_source_string)
+ r_source_string << if source_load.isRecovered
+ "\n Recovered Queue: "
+ else
+ "\n Normal Queue: "
+ end
+ r_source_string << source_load.getQueueId
+ end
+
+ def build_running_source_stats(source_load, r_source_string)
+ if source_load.isRunning
+ build_shipped_stats(source_load, r_source_string)
+ build_load_general_stats(source_load, r_source_string)
+ r_source_string << ', Replication Lag=' +
+ source_load.getReplicationLag.to_s
+ else
+ r_source_string << "\n "
+ r_source_string << 'No Reader/Shipper threads runnning yet.'
+ end
+ end
+
+ def build_shipped_stats(source_load, r_source_string)
+ r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
+ "\n " \
+ 'No Ops shipped since last restart'
+ else
+ "\n AgeOfLastShippedOp=" +
+ source_load.getAgeOfLastShippedOp.to_s +
+ ', TimeStampOfLastShippedOp=' +
+ java.util.Date.new(source_load
+ .getTimeStampOfLastShippedOp).toString
+ end
+ end
+
+ def build_load_general_stats(source_load, r_source_string)
+ r_source_string << ', SizeOfLogQueue=' +
+ source_load.getSizeOfLogQueue.to_s
+ r_source_string << ', EditsReadFromLogQueue=' +
+ source_load.getEditsRead.to_s
+ r_source_string << ', OpsShippedToTarget=' +
+ source_load.getOPsShipped.to_s
+ build_edits_for_source(source_load, r_source_string)
+ end
+
+ def build_edits_for_source(source_load, r_source_string)
+ if source_load.hasEditsSinceRestart
+ r_source_string << ', TimeStampOfNextToReplicate=' +
+ java.util.Date.new(source_load
+ .getTimeStampOfNextToReplicate).toString
+ else
+ r_source_string << ', No edits for this source'
+ r_source_string << ' since it started'
+ end
+ end
+
#----------------------------------------------------------------------------------------------
#
# Helper methods