Author: jdcryans Date: Thu Sep 20 18:59:05 2012 New Revision: 1388160 URL: http://svn.apache.org/viewvc?rev=1388160&view=rev Log: HBASE-6847 HBASE-6649 broke replication (Devaraj Das via JD)
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1388160&r1=1388159&r2=1388160&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Sep 20 18:59:05 2012 @@ -338,10 +338,6 @@ public class ReplicationSource extends T } } finally { try { - // if current path is null, it means we processEndOfFile hence - if (this.currentPath != null && !gotIOE) { - this.position = this.reader.getPosition(); - } if (this.reader != null) { this.reader.close(); } @@ -391,7 +387,8 @@ public class ReplicationSource extends T if (this.position != 0) { this.reader.seek(this.position); } - HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); + long startPosition = this.position; + HLog.Entry entry = readNextAndSetPosition(); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); @@ -420,13 +417,13 @@ public class ReplicationSource extends T } } // Stop if too many entries or too big - if ((this.reader.getPosition() - this.position) + if ((this.reader.getPosition() - startPosition) >= this.replicationQueueSizeCapacity || currentNbEntries >= this.replicationQueueNbCapacity) { break; } try { - entry = this.reader.next(entriesArray[currentNbEntries]); + entry = readNextAndSetPosition(); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -434,12 +431,22 @@ public class ReplicationSource extends T } LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + - " and size: " + (this.reader.getPosition() - this.position)); + " and size: " + (this.reader.getPosition() - startPosition)); // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); } + private HLog.Entry readNextAndSetPosition() throws IOException { + HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + // Store the position so that in the future the reader can start + // reading from here. If the above call to next() throws an + // exception, the position won't be changed and retry will happen + // from the last known good position + this.position = this.reader.getPosition(); + return entry; + } + private void connectToPeers() { // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) {