Repository: hbase
Updated Branches:
  refs/heads/0.98 568b3f7dd -> c95f214cc


HBASE-15984 Handle premature EOF treatment of WALs in replication.

In some particular deployments, the Replication code believes it has
reached EOF for a WAL prior to succesfully parsing all bytes known to
exist in a cleanly closed file.

Consistently this failure happens due to an InvalidProtobufException
after some number of seeks during our attempts to tail the in-progress
RegionServer WAL. As a work-around, this patch treats cleanly closed
files differently than other execution paths. If an EOF is detected due
to parsing or other errors while there are still unparsed bytes before
the end-of-file trailer, we now reset the WAL to the very beginning and
attempt a clean read-through.

In current testing, a single such reset is sufficient to work around
observed dataloss. However, the above change will retry a given WAL file
indefinitely. On each such attempt, a log message like the below will
be emitted at the WARN level:

  Processing end of WAL file '{}'. At position {}, which is too far away
  from reported file length {}. Restarting WAL reading (see HBASE-15983
  for details).

Additionally, this patch adds some additional log detail at the TRACE
level about file offsets seen while handling recoverable errors. It also
add metrics that measure the use of this recovery mechanism.

Conflicts:
        
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
        
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
        
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cf192c96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cf192c96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cf192c96

Branch: refs/heads/0.98
Commit: cf192c96b0ef221f7e309bc2174f0d38dc44d96d
Parents: 568b3f7
Author: Sean Busbey <[email protected]>
Authored: Tue Jun 7 16:00:46 2016 -0500
Committer: Andrew Purtell <[email protected]>
Committed: Tue Oct 4 11:53:36 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         | 17 +++++
 .../MetricsReplicationGlobalSourceSource.java   | 50 +++++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 78 +++++++++++++++++++
 .../MetricsReplicationGlobalSourceSource.java   | 50 +++++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 79 ++++++++++++++++++++
 .../regionserver/wal/ProtobufLogReader.java     | 45 +++++++++--
 .../replication/regionserver/MetricsSource.java | 39 +++++++++-
 .../ReplicationHLogReaderManager.java           | 10 +++
 .../regionserver/ReplicationSource.java         | 40 ++++++++--
 src/main/asciidoc/_chapters/ops_mgt.adoc        | 24 +++++-
 10 files changed, 414 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index ea0ae20..06033ae 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -37,6 +37,16 @@ public interface MetricsReplicationSourceSource extends 
BaseSource {
 
   public static final String SOURCE_LOG_EDITS_FILTERED = 
"source.logEditsFiltered";
 
+  public static final String SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH =
+      "source.closedLogsWithUnknownFileLength";
+  public static final String SOURCE_UNCLEANLY_CLOSED_LOGS = 
"source.uncleanlyClosedLogs";
+  public static final String SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES =
+      "source.ignoredUncleanlyClosedLogContentsInBytes";
+  public static final String SOURCE_RESTARTED_LOG_READING = 
"source.restartedLogReading";
+  public static final String SOURCE_REPEATED_LOG_FILE_BYTES = 
"source.repeatedLogFileBytes";
+  public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
+  public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = 
"source.completedRecoverQueues";
+
   void setLastShippedAge(long age);
   void setSizeOfLogQueue(int size);
   void incrSizeOfLogQueue(int size);
@@ -49,4 +59,11 @@ public interface MetricsReplicationSourceSource extends 
BaseSource {
   void incrLogReadInEdits(long size);
   void clear();
   long getLastShippedAge();
+  void incrUnknownFileLengthForClosedWAL();
+  void incrUncleanlyClosedWALs();
+  void incrBytesSkippedInUncleanlyClosedWALs(final long bytes);
+  void incrRestartedWALReading();
+  void incrRepeatedFileBytes(final long bytes);
+  void incrCompletedWAL();
+  void incrCompletedRecoveryQueue();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index ee59b1c..cedabbc 100644
--- 
a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -34,6 +34,13 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   @Deprecated
   private final MetricMutableCounterLong shippedKBsCounter;
   private final MetricMutableCounterLong logReadInBytesCounter;
+  private final MetricMutableCounterLong unknownFileLengthForClosedWAL;
+  private final MetricMutableCounterLong uncleanlyClosedWAL;
+  private final MetricMutableCounterLong uncleanlyClosedSkippedBytes;
+  private final MetricMutableCounterLong restartWALReading;
+  private final MetricMutableCounterLong repeatedFileBytes;
+  private final MetricMutableCounterLong completedWAL;
+  private final MetricMutableCounterLong completedRecoveryQueue;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl 
rms) {
     this.rms = rms;
@@ -54,6 +61,14 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
     logReadInEditsCounter = 
rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
 
     logEditsFilteredCounter = 
rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+    unknownFileLengthForClosedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 
0L);
+    uncleanlyClosedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
+    uncleanlyClosedSkippedBytes = 
rms.getMetricsRegistry().getLongCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES,
 0L);
+    restartWALReading = 
rms.getMetricsRegistry().getLongCounter(SOURCE_RESTARTED_LOG_READING, 0L);
+    repeatedFileBytes = 
rms.getMetricsRegistry().getLongCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
+    completedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_COMPLETED_LOGS, 0L);
+    completedRecoveryQueue = 
rms.getMetricsRegistry().getLongCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -117,6 +132,41 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   }
 
   @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
+
+  @Override
   public void init() {
     rms.init();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
 
b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index e15af46..4cd207e 100644
--- 
a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ 
b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -49,6 +49,21 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   private final MetricMutableCounterLong shippedBytesCounter;
   private final MetricMutableCounterLong logReadInBytesCounter;
 
+  private final String unknownFileLengthKey;
+  private final String uncleanlyClosedKey;
+  private final String uncleanlySkippedBytesKey;
+  private final String restartedKey;
+  private final String repeatedBytesKey;
+  private final String completedLogsKey;
+  private final String completedRecoveryKey;
+  private final MetricMutableCounterLong unknownFileLengthForClosedWAL;
+  private final MetricMutableCounterLong uncleanlyClosedWAL;
+  private final MetricMutableCounterLong uncleanlyClosedSkippedBytes;
+  private final MetricMutableCounterLong restartWALReading;
+  private final MetricMutableCounterLong repeatedFileBytes;
+  private final MetricMutableCounterLong completedWAL;
+  private final MetricMutableCounterLong completedRecoveryQueue;
+
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, 
String id) {
     this.rms = rms;
     this.id = id;
@@ -80,6 +95,27 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
 
     logEditsFilteredKey = this.keyPrefix + ".logEditsFiltered";
     logEditsFilteredCounter = 
rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+    unknownFileLengthKey = this.keyPrefix + "closedLogsWithUnknownFileLength";
+    unknownFileLengthForClosedWAL = 
rms.getMetricsRegistry().getLongCounter(unknownFileLengthKey, 0L);
+
+    uncleanlyClosedKey = this.keyPrefix + "uncleanlyClosedLogs";
+    uncleanlyClosedWAL = 
rms.getMetricsRegistry().getLongCounter(uncleanlyClosedKey, 0L);
+
+    uncleanlySkippedBytesKey = this.keyPrefix + 
"ignoredUncleanlyClosedLogContentsInBytes";
+    uncleanlyClosedSkippedBytes = 
rms.getMetricsRegistry().getLongCounter(uncleanlySkippedBytesKey, 0L);
+
+    restartedKey = this.keyPrefix + "restartedLogReading";
+    restartWALReading = rms.getMetricsRegistry().getLongCounter(restartedKey, 
0L);
+
+    repeatedBytesKey = this.keyPrefix + "repeatedLogFileBytes";
+    repeatedFileBytes = 
rms.getMetricsRegistry().getLongCounter(repeatedBytesKey, 0L);
+
+    completedLogsKey = this.keyPrefix + "completedLogs";
+    completedWAL = rms.getMetricsRegistry().getLongCounter(completedLogsKey, 
0L);
+
+    completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
+    completedRecoveryQueue = 
rms.getMetricsRegistry().getLongCounter(completedRecoveryKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -137,6 +173,14 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     rms.removeMetric(logReadInEditsKey);
 
     rms.removeMetric(logEditsFilteredKey);
+
+    rms.removeMetric(unknownFileLengthKey);
+    rms.removeMetric(uncleanlyClosedKey);
+    rms.removeMetric(uncleanlySkippedBytesKey);
+    rms.removeMetric(restartedKey);
+    rms.removeMetric(repeatedBytesKey);
+    rms.removeMetric(completedLogsKey);
+    rms.removeMetric(completedRecoveryKey);
   }
 
   @Override
@@ -144,6 +188,40 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     return ageOfLastShipped;
   }
 
+  @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
 
   @Override
   public void init() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index da1bcf4..d17ba02 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -33,6 +33,13 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   @Deprecated
   private final MutableCounterLong shippedKBsCounter;
   private final MutableCounterLong logReadInBytesCounter;
+  private final MutableCounterLong unknownFileLengthForClosedWAL;
+  private final MutableCounterLong uncleanlyClosedWAL;
+  private final MutableCounterLong uncleanlyClosedSkippedBytes;
+  private final MutableCounterLong restartWALReading;
+  private final MutableCounterLong repeatedFileBytes;
+  private final MutableCounterLong completedWAL;
+  private final MutableCounterLong completedRecoveryQueue;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl 
rms) {
     this.rms = rms;
@@ -53,6 +60,14 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
     logReadInEditsCounter = 
rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
 
     logEditsFilteredCounter = 
rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+    unknownFileLengthForClosedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 
0L);
+    uncleanlyClosedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
+    uncleanlyClosedSkippedBytes = 
rms.getMetricsRegistry().getLongCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES,
 0L);
+    restartWALReading = 
rms.getMetricsRegistry().getLongCounter(SOURCE_RESTARTED_LOG_READING, 0L);
+    repeatedFileBytes = 
rms.getMetricsRegistry().getLongCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
+    completedWAL = 
rms.getMetricsRegistry().getLongCounter(SOURCE_COMPLETED_LOGS, 0L);
+    completedRecoveryQueue = 
rms.getMetricsRegistry().getLongCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -120,6 +135,41 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   }
 
   @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
+
+  @Override
   public void init() {
     rms.init();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 55c9b05..f466302 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -48,6 +48,21 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   private final MutableCounterLong shippedBytesCounter;
   private final MutableCounterLong logReadInBytesCounter;
 
+  private final String unknownFileLengthKey;
+  private final String uncleanlyClosedKey;
+  private final String uncleanlySkippedBytesKey;
+  private final String restartedKey;
+  private final String repeatedBytesKey;
+  private final String completedLogsKey;
+  private final String completedRecoveryKey;
+  private final MutableCounterLong unknownFileLengthForClosedWAL;
+  private final MutableCounterLong uncleanlyClosedWAL;
+  private final MutableCounterLong uncleanlyClosedSkippedBytes;
+  private final MutableCounterLong restartWALReading;
+  private final MutableCounterLong repeatedFileBytes;
+  private final MutableCounterLong completedWAL;
+  private final MutableCounterLong completedRecoveryQueue;
+
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, 
String id) {
     this.rms = rms;
     this.id = id;
@@ -79,6 +94,27 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
 
     logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = 
rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+    unknownFileLengthKey = this.keyPrefix + "closedLogsWithUnknownFileLength";
+    unknownFileLengthForClosedWAL = 
rms.getMetricsRegistry().getLongCounter(unknownFileLengthKey, 0L);
+
+    uncleanlyClosedKey = this.keyPrefix + "uncleanlyClosedLogs";
+    uncleanlyClosedWAL = 
rms.getMetricsRegistry().getLongCounter(uncleanlyClosedKey, 0L);
+
+    uncleanlySkippedBytesKey = this.keyPrefix + 
"ignoredUncleanlyClosedLogContentsInBytes";
+    uncleanlyClosedSkippedBytes = 
rms.getMetricsRegistry().getLongCounter(uncleanlySkippedBytesKey, 0L);
+
+    restartedKey = this.keyPrefix + "restartedLogReading";
+    restartWALReading = rms.getMetricsRegistry().getLongCounter(restartedKey, 
0L);
+
+    repeatedBytesKey = this.keyPrefix + "repeatedLogFileBytes";
+    repeatedFileBytes = 
rms.getMetricsRegistry().getLongCounter(repeatedBytesKey, 0L);
+
+    completedLogsKey = this.keyPrefix + "completedLogs";
+    completedWAL = rms.getMetricsRegistry().getLongCounter(completedLogsKey, 
0L);
+
+    completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
+    completedRecoveryQueue = 
rms.getMetricsRegistry().getLongCounter(completedRecoveryKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -136,6 +172,14 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     rms.removeMetric(logReadInEditsKey);
 
     rms.removeMetric(logEditsFilteredKey);
+
+    rms.removeMetric(unknownFileLengthKey);
+    rms.removeMetric(uncleanlyClosedKey);
+    rms.removeMetric(uncleanlySkippedBytesKey);
+    rms.removeMetric(restartedKey);
+    rms.removeMetric(repeatedBytesKey);
+    rms.removeMetric(completedLogsKey);
+    rms.removeMetric(completedRecoveryKey);
   }
 
   @Override
@@ -144,6 +188,41 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   }
 
   @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
+
+  @Override
   public void init() {
     rms.init();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index a1377cf..6284d2d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -74,6 +74,21 @@ public class ProtobufLogReader extends ReaderBase {
     writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
   }
 
+  @InterfaceAudience.Private
+  public long trailerSize() {
+    if (trailerPresent) {
+      // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
+      final long calculatedSize = PB_WAL_COMPLETE_MAGIC.length + 
Bytes.SIZEOF_INT + trailer.getSerializedSize();
+      final long expectedSize = fileLength - walEditsStopOffset;
+      if (expectedSize != calculatedSize) {
+        LOG.warn("After parsing the trailer, we expect the total footer to be 
"+ expectedSize +" bytes, but we calculate it as being " + calculatedSize);
+      }
+      return expectedSize;
+    } else {
+      return -1L;
+    }
+  }
+
   enum WALHdrResult {
     EOF,                   // stream is at EOF when method starts
     SUCCESS,
@@ -180,7 +195,7 @@ public class ProtobufLogReader extends ReaderBase {
     this.seekOnFs(currentPosition);
     if (LOG.isTraceEnabled()) {
       LOG.trace("After reading the trailer: walEditsStopOffset: " + 
this.walEditsStopOffset
-          + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + 
trailerPresent);
+          + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + 
(trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", 
currentPosition: " + currentPosition);
     }
     return hdrCtxt.getCellCodecClsName();
   }
@@ -273,6 +288,9 @@ public class ProtobufLogReader extends ReaderBase {
       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
       long originalPosition = this.inputStream.getPos();
       if (trailerPresent && originalPosition > 0 && originalPosition == 
this.walEditsStopOffset) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Reached end of expected edits area at offset " + 
originalPosition);
+        }
         return false;
       }
       WALKey.Builder builder = WALKey.newBuilder();
@@ -282,7 +300,7 @@ public class ProtobufLogReader extends ReaderBase {
         try {
           int firstByte = this.inputStream.read();
           if (firstByte == -1) {
-            throw new EOFException("First byte is negative");
+            throw new EOFException("First byte is negative at offset " + 
originalPosition);
           }
           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
           // available may be < 0 on local fs for instance.  If so, can't 
depend on it.
@@ -290,7 +308,7 @@ public class ProtobufLogReader extends ReaderBase {
           if (available > 0 && available < size) {
             throw new EOFException("Available stream not enough for edit, " +
                 "inputStream.available()= " + this.inputStream.available() + 
", " +
-                "entry size= " + size);
+                "entry size= " + size + " at offset = " + 
this.inputStream.getPos());
           }
           ProtobufUtil.mergeFrom(builder, new 
LimitInputStream(this.inputStream, size),
             (int)size);
@@ -303,12 +321,14 @@ public class ProtobufLogReader extends ReaderBase {
           // TODO: not clear if we should try to recover from corrupt PB that 
looks semi-legit.
           //       If we can get the KV count, we could, theoretically, try to 
get next record.
           throw new EOFException("Partial PB while reading WAL, " +
-              "probably an unexpected EOF, ignoring");
+              "probably an unexpected EOF, ignoring. current offset=" + 
this.inputStream.getPos());
         }
         WALKey walKey = builder.build();
         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
         if (!walKey.hasFollowingKvCount() || 0 == 
walKey.getFollowingKvCount()) {
-          LOG.trace("WALKey has no KVs that follow it; trying the next one");
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("WALKey has no KVs that follow it; trying the next one. 
current offset=" + this.inputStream.getPos());
+          }
           continue;
         }
         int expectedCells = walKey.getFollowingKvCount();
@@ -323,7 +343,9 @@ public class ProtobufLogReader extends ReaderBase {
           try {
             posAfterStr = this.inputStream.getPos() + "";
           } catch (Throwable t) {
-            LOG.trace("Error getting pos for error message - ignoring", t);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Error getting pos for error message - ignoring", t);
+            }
           }
           String message = " while reading " + expectedCells + " WAL KVs; 
started reading at "
               + posBefore + " and read up to " + posAfterStr;
@@ -338,11 +360,18 @@ public class ProtobufLogReader extends ReaderBase {
           throw new EOFException("Read WALTrailer while reading WALEdits");
         }
       } catch (EOFException eof) {
-        LOG.trace("Encountered a malformed edit, seeking back to last good 
position in file", eof);
         // If originalPosition is < 0, it is rubbish and we cannot use it 
(probably local fs)
-        if (originalPosition < 0) throw eof;
+        if (originalPosition < 0) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Encountered a malformed edit, but can't seek back to 
last good position because originalPosition is negative. last offset=" + 
this.inputStream.getPos(), eof);
+          }
+          throw eof;
+        }
         // Else restore our position to original location in hope that next 
time through we will
         // read successfully.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Encountered a malformed edit, seeking back to last good 
position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);
+        }
         seekOnFs(originalPosition);
         return false;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 5824d83..134477d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
+hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java<<<
 * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -210,7 +210,42 @@ public class MetricsSource implements BaseSource {
   public String getPeerID() {
     return id;
   }
-  
+
+  public void incrUnknownFileLengthForClosedWAL() {
+    singleSourceSource.incrUnknownFileLengthForClosedWAL();
+    globalSourceSource.incrUnknownFileLengthForClosedWAL();
+  }
+
+  public void incrUncleanlyClosedWALs() {
+    singleSourceSource.incrUncleanlyClosedWALs();
+    globalSourceSource.incrUncleanlyClosedWALs();
+  }
+
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
+    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
+  }
+
+  public void incrRestartedWALReading() {
+    singleSourceSource.incrRestartedWALReading();
+    globalSourceSource.incrRestartedWALReading();
+  }
+
+  public void incrRepeatedFileBytes(final long bytes) {
+    singleSourceSource.incrRepeatedFileBytes(bytes);
+    globalSourceSource.incrRepeatedFileBytes(bytes);
+  }
+
+  public void incrCompletedWAL() {
+    singleSourceSource.incrCompletedWAL();
+    globalSourceSource.incrCompletedWAL();
+  }
+
+  public void incrCompletedRecoveryQueue() {
+    singleSourceSource.incrCompletedRecoveryQueue();
+    globalSourceSource.incrCompletedRecoveryQueue();
+  }
+
   @Override
   public void init() {
     singleSourceSource.init();

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
index ccae169..2a07e18 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 
 import java.io.IOException;
 
@@ -118,6 +119,15 @@ public class ReplicationHLogReaderManager {
     this.position = pos;
   }
 
+  public long currentTrailerSize() {
+    long size = -1L;
+    if (reader instanceof ProtobufLogReader) {
+      final ProtobufLogReader pblr = (ProtobufLogReader)reader;
+      size = pblr.trailerSize();
+    }
+    return size;
+  }
+
   /**
    * Close the current reader
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f545129..9b0dfff 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -758,23 +758,49 @@ public class ReplicationSource extends Thread
    * continue trying to read from it
    */
   protected boolean processEndOfFile() {
+    // We presume this means the file we're reading is closed.
     if (this.queue.size() != 0) {
+      // -1 means the wal wasn't closed cleanly.
+      final long trailerSize = this.repLogReader.currentTrailerSize();
+      final long currentPosition = this.repLogReader.getPosition();
+      FileStatus stat = null;
+      try {
+        stat = fs.getFileStatus(this.currentPath);
+      } catch (IOException exception) {
+        LOG.warn("Couldn't get file length information about log " + 
this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed 
cleanly"
+            + ", stats: " + getStats());
+        metrics.incrUnknownFileLengthForClosedWAL();
+      }
+      if (stat != null) {
+        if (trailerSize < 0) {
+          if (currentPosition < stat.getLen()) {
+            final long skippedBytes = stat.getLen() - currentPosition;
+            LOG.info("Reached the end of WAL file '" + currentPath + "'. It 
was not closed cleanly, so we did not parse " + skippedBytes + " bytes of 
data.");
+            metrics.incrUncleanlyClosedWALs();
+            metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+          }
+        } else if (currentPosition + trailerSize < stat.getLen()){
+          LOG.warn("Processing end of WAL file '" + currentPath + "'. At 
position " + currentPosition + ", which is too far away from reported file 
length " + stat.getLen() +
+            ". Restarting WAL reading (see HBASE-15983 for details). stats: " 
+ getStats());
+          repLogReader.setPosition(0);
+          metrics.incrRestartedWALReading();
+          metrics.incrRepeatedFileBytes(currentPosition);
+          return false;
+        }
+      }
       if (LOG.isTraceEnabled()) {
-        String filesize = "N/A";
-        try {
-          FileStatus stat = this.fs.getFileStatus(this.currentPath);
-          filesize = stat.getLen()+"";
-        } catch (IOException ex) {}
-        LOG.trace("Reached the end of a log, stats: " + getStats() +
-            ", and the length of the file is " + filesize);
+        LOG.trace("Reached the end of a log, stats: " + getStats()
+          + ", and the length of the file is " + (stat == null ? "N/A" : 
stat.getLen()));
       }
       this.currentPath = null;
       this.repLogReader.finishCurrentFile();
       this.reader = null;
+      metrics.incrCompletedWAL();
       return true;
     } else if (this.replicationQueueInfo.isQueueRecovered()) {
       this.manager.closeRecoveredQueue(this);
       LOG.info("Finished recovering the queue with the following stats " + 
getStats());
+      metrics.incrCompletedRecoveryQueue();
       this.running = false;
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf192c96/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc 
b/src/main/asciidoc/_chapters/ops_mgt.adoc
index fa31c8c..898b992 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1577,7 +1577,7 @@ The new layout will be:
 
 === Replication Metrics
 
-The following metrics are exposed at the global region server level and (since 
HBase 0.95) at the peer level:
+The following metrics are exposed at the global region server level and at the 
peer level:
 
 `source.sizeOfLogQueue`::
   number of WALs to process (excludes the one which is being processed) at the 
Replication source
@@ -1591,6 +1591,28 @@ The following metrics are exposed at the global region 
server level and (since H
 `source.ageOfLastShippedOp`::
   age of last batch that was shipped by the replication source
 
+`source.completedLogs`::
+  The number of write-ahead-log files that have completed their acknowledged 
sending to the peer associated with this source. Increments to this metric are 
a part of normal operation of HBase replication.
+
+`source.completedRecoverQueues`::
+  The number of recovery queues this source has completed sending to the 
associated peer. Increments to this metric are a part of normal recovery of 
HBase replication in the face of failed Region Servers.
+
+`source.uncleanlyClosedLogs`::
+  The number of write-ahead-log files the replication system considered 
completed after reaching the end of readable entries in the face of an 
uncleanly closed file.
+
+`source.ignoredUncleanlyClosedLogContentsInBytes`::
+  When a write-ahead-log file is not closed cleanly, there will likely be some 
entry that has been partially serialized. This metric contains the number of 
bytes of such entries the HBase replication system believes were remaining at 
the end of files skipped in the face of an uncleanly closed file. Those bytes 
should either be in different file or represent a client write that was not 
acknowledged.
+
+`source.restartedLogReading`::
+  The number of times the HBase replication system detected that it failed to 
correctly parse a cleanly closed write-ahead-log file. In this circumstance, 
the system replays the entire log from the beginning, ensuring that no edits 
fail to be acknowledged by the associated peer. Increments to this metric 
indicate that the HBase replication system is having difficulty correctly 
handling failures in the underlying distributed storage system. No dataloss 
should occur, but you should check Region Server log files for details of the 
failures.
+
+`source.repeatedLogFileBytes`::
+  When the HBase replication system determines that it needs to replay a given 
write-ahead-log file, this metric is incremented by the number of bytes the 
replication system believes had already been acknowledged by the associated 
peer prior to starting over.
+
+`source.closedLogsWithUnknownFileLength`::
+  Incremented when the HBase replication system believes it is at the end of a 
write-ahead-log file but it can not determine the length of that file in the 
underlying distributed storage system. Could indicate dataloss since the 
replication system is unable to determine if the end of readable entries lines 
up with the expected end of the file. You should check Region Server log files 
for details of the failures.
+
+
 === Replication Configuration Options
 
 [cols="1,1,1", options="header"]

Reply via email to