Author: stack
Date: Tue Mar 3 19:51:58 2009
New Revision: 749716
URL: http://svn.apache.org/viewvc?rev=749716&view=rev
Log:
HBASE-1233 Transactional fixes: Overly conservative scan read-set, potential CME
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=749716&r1=749715&r2=749716&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Mar 3 19:51:58 2009
@@ -29,6 +29,8 @@
HBASE-1217 add new compression and hfile blocksize to HColumnDescriptor
HBASE-859 HStoreKey needs a reworking
HBASE-1211 NPE in retries exhausted exception
+ HBASE-1233 Transactional fixes: Overly conservative scan read-set,
+ potential CME (Clint Morgan via Stack)
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html?rev=749716&r1=749715&r2=749716&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
Tue Mar 3 19:51:58 2009
@@ -42,18 +42,20 @@
<i>hbase.regionserver.impl </i> to
<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
+<p>
+The read set claimed by a transactional scanner is determined from the start
and
+ end keys which the scanner is opened with.
+
+
+
<h3> Known Issues </h3>
Recovery in the face of hregion server failure
is not fully implemented. Thus, you cannot rely on the transactional
properties in the face of node failure.
-<p> In order to avoid phantom reads on scanners, scanners currently
-claim a <i>write set</i> for all rows in every regions which they scan
-through. This means that if transaction A writes to a region that
-transaction B is scanning, then there is a conflict (only one
-transacton can be committed). This will occur even if the scanner
-never went over the row that was written.
+
+
</body>
</html>
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java?rev=749716&r1=749715&r2=749716&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
Tue Mar 3 19:51:58 2009
@@ -88,6 +88,14 @@
return operator;
}
+ /** Get the filters.
+ *
+ * @return filters
+ */
+ public Set<RowFilterInterface> getFilters() {
+ return filters;
+ }
+
/** Add a filter.
*
* @param filter
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=749716&r1=749715&r2=749716&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
Tue Mar 3 19:51:58 2009
@@ -31,6 +31,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
@@ -58,40 +63,69 @@
ABORTED
}
+ /**
+ * Simple container of the range of the scanners we've opened. Used to check
+ * for conflicting writes.
+ */
+ private class ScanRange {
+ private byte[] startRow;
+ private byte[] endRow;
+
+ public ScanRange(byte[] startRow, byte[] endRow) {
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ /**
+ * Check if this scan range contains the given key.
+ *
+ * @param rowKey
+ * @return
+ */
+ public boolean contains(byte[] rowKey) {
+ if (startRow != null && Bytes.compareTo(rowKey, startRow) < 0) {
+ return false;
+ }
+ if (endRow != null && Bytes.compareTo(endRow, rowKey) < 0) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private final HRegionInfo regionInfo;
private final long hLogStartSequenceId;
private final long transactionId;
private Status status;
private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
Bytes.BYTES_COMPARATOR);
private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
+ private List<ScanRange> scanSet = new LinkedList<ScanRange>();
private Set<TransactionState> transactionsToCheck = new
HashSet<TransactionState>();
private int startSequenceNumber;
private Integer sequenceNumber;
- boolean hasScan = false;
- //TODO: Why don't these methods and the class itself use default access?
- // They are only referenced from within this package.
-
- public TransactionState(final long transactionId,
- final long rLogStartSequenceId) {
+ TransactionState(final long transactionId, final long rLogStartSequenceId,
+ HRegionInfo regionInfo) {
this.transactionId = transactionId;
this.hLogStartSequenceId = rLogStartSequenceId;
+ this.regionInfo = regionInfo;
this.status = Status.PENDING;
}
- public void addRead(final byte[] rowKey) {
+ void addRead(final byte[] rowKey) {
readSet.add(rowKey);
}
- public Set<byte[]> getReadSet() {
+ Set<byte[]> getReadSet() {
return readSet;
}
- public void addWrite(final BatchUpdate write) {
+ void addWrite(final BatchUpdate write) {
writeSet.add(write);
}
- public List<BatchUpdate> getWriteSet() {
+ List<BatchUpdate> getWriteSet() {
return writeSet;
}
@@ -103,8 +137,8 @@
* @param timestamp
* @return
*/
- public Map<byte[], Cell> localGetFull(final byte[] row,
- final Set<byte[]> columns, final long timestamp) {
+ Map<byte[], Cell> localGetFull(final byte[] row, final Set<byte[]> columns,
+ final long timestamp) {
Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
for (BatchUpdate b : writeSet) {
@@ -133,8 +167,7 @@
* @param timestamp
* @return
*/
- public Cell[] localGet(final byte[] row, final byte[] column,
- final long timestamp) {
+ Cell[] localGet(final byte[] row, final byte[] column, final long timestamp)
{
ArrayList<Cell> results = new ArrayList<Cell>();
// Go in reverse order to put newest updates first in list
@@ -158,11 +191,11 @@
.toArray(new Cell[results.size()]);
}
- public void addTransactionToCheck(final TransactionState transaction) {
+ void addTransactionToCheck(final TransactionState transaction) {
transactionsToCheck.add(transaction);
}
- public boolean hasConflict() {
+ boolean hasConflict() {
for (TransactionState transactionState : transactionsToCheck) {
if (hasConflict(transactionState)) {
return true;
@@ -177,18 +210,22 @@
}
for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
- if (this.hasScan) {
- LOG.info("Transaction" + this.toString()
- + " has a scan read. Meanwile a write occured. "
- + "Conservitivly reporting conflict");
- return true;
- }
-
if (this.getReadSet().contains(otherUpdate.getRow())) {
- LOG.trace("Transaction " + this.toString() + " conflicts with "
- + checkAgainst.toString());
+ LOG.debug("Transaction [" + this.toString()
+ + "] has read which conflicts with [" + checkAgainst.toString()
+ + "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+ + Bytes.toString(otherUpdate.getRow()) + "]");
return true;
}
+ for (ScanRange scanRange : this.scanSet) {
+ if (scanRange.contains(otherUpdate.getRow())) {
+ LOG.debug("Transaction [" + this.toString()
+ + "] has scan which conflicts with [" + checkAgainst.toString()
+ + "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+ + Bytes.toString(otherUpdate.getRow()) + "]");
+ return true;
+ }
+ }
}
return false;
}
@@ -198,7 +235,7 @@
*
* @return Return the status.
*/
- public Status getStatus() {
+ Status getStatus() {
return status;
}
@@ -207,7 +244,7 @@
*
* @param status The status to set.
*/
- public void setStatus(final Status status) {
+ void setStatus(final Status status) {
this.status = status;
}
@@ -216,7 +253,7 @@
*
* @return Return the startSequenceNumber.
*/
- public int getStartSequenceNumber() {
+ int getStartSequenceNumber() {
return startSequenceNumber;
}
@@ -225,7 +262,7 @@
*
* @param startSequenceNumber.
*/
- public void setStartSequenceNumber(final int startSequenceNumber) {
+ void setStartSequenceNumber(final int startSequenceNumber) {
this.startSequenceNumber = startSequenceNumber;
}
@@ -234,7 +271,7 @@
*
* @return Return the sequenceNumber.
*/
- public Integer getSequenceNumber() {
+ Integer getSequenceNumber() {
return sequenceNumber;
}
@@ -243,7 +280,7 @@
*
* @param sequenceNumber The sequenceNumber to set.
*/
- public void setSequenceNumber(final Integer sequenceNumber) {
+ void setSequenceNumber(final Integer sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
@@ -256,6 +293,8 @@
result.append(status.name());
result.append(" read Size: ");
result.append(readSet.size());
+ result.append(" scan Size: ");
+ result.append(scanSet.size());
result.append(" write Size: ");
result.append(writeSet.size());
result.append(" startSQ: ");
@@ -274,7 +313,7 @@
*
* @return Return the transactionId.
*/
- public long getTransactionId() {
+ long getTransactionId() {
return transactionId;
}
@@ -283,17 +322,41 @@
*
* @return Return the startSequenceId.
*/
- public long getHLogStartSequenceId() {
+ long getHLogStartSequenceId() {
return hLogStartSequenceId;
}
- /**
- * Set the hasScan.
- *
- * @param hasScan The hasScan to set.
- */
- public void setHasScan(final boolean hasScan) {
- this.hasScan = hasScan;
+ void addScan(byte[] firstRow, RowFilterInterface filter) {
+ ScanRange scanRange = new ScanRange(firstRow, getEndRow(filter));
+ LOG.trace(String.format(
+ "Adding scan for transcaction [%s], from [%s] to [%s]", transactionId,
+ scanRange.startRow == null ? "null" : Bytes
+ .toString(scanRange.startRow), scanRange.endRow == null ? "null"
+ : Bytes.toString(scanRange.endRow)));
+ scanSet.add(scanRange);
+ }
+
+ private byte[] getEndRow(RowFilterInterface filter) {
+ if (filter instanceof WhileMatchRowFilter) {
+ WhileMatchRowFilter wmrFilter = (WhileMatchRowFilter) filter;
+ if (wmrFilter.getInternalFilter() instanceof StopRowFilter) {
+ StopRowFilter stopFilter = (StopRowFilter) wmrFilter
+ .getInternalFilter();
+ return stopFilter.getStopRowKey();
+ }
+ } else if (filter instanceof RowFilterSet) {
+ RowFilterSet rowFilterSet = (RowFilterSet) filter;
+ if (rowFilterSet.getOperator()
+ .equals(RowFilterSet.Operator.MUST_PASS_ALL)) {
+ for (RowFilterInterface subFilter : rowFilterSet.getFilters()) {
+ byte[] endRow = getEndRow(subFilter);
+ if (endRow != null) {
+ return endRow;
+ }
+ }
+ }
+ }
+ return null;
}
}
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=749716&r1=749715&r2=749716&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
Tue Mar 3 19:51:58 2009
@@ -180,14 +180,16 @@
alias.setStatus(Status.ABORTED);
retireTransaction(alias);
}
+ LOG.error("Existing trasaction with id ["+key+"] in region
["+super.getRegionInfo().getRegionNameAsString()+"]");
throw new IOException("Already exiting transaction id: " + key);
}
TransactionState state = new TransactionState(transactionId, super.getLog()
- .getSequenceNumber());
+ .getSequenceNumber(), super.getRegionInfo());
- // Order is important here
- for (TransactionState commitPending : commitPendingTransactions) {
+ // Order is important here ...
+ List<TransactionState> commitPendingCopy = new
LinkedList<TransactionState>(commitPendingTransactions);
+ for (TransactionState commitPending : commitPendingCopy) {
state.addTransactionToCheck(commitPending);
}
state.setStartSequenceNumber(nextSequenceId.get());
@@ -196,6 +198,7 @@
try {
transactionLeases.createLease(key, new TransactionLeaseListener(key));
} catch (LeaseStillHeldException e) {
+ LOG.error("Lease still held for ["+key+"] in region
["+super.getRegionInfo().getRegionNameAsString()+"]");
throw new RuntimeException(e);
}
LOG.debug("Begining transaction " + key + " in region "
@@ -337,6 +340,8 @@
public InternalScanner getScanner(final long transactionId,
final byte[][] cols, final byte[] firstRow, final long timestamp,
final RowFilterInterface filter) throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addScan(firstRow, filter);
return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
timestamp, filter));
}
@@ -578,14 +583,31 @@
numRemoved++;
}
- if (numRemoved > 0) {
- LOG.debug("Removed " + numRemoved
- + " commited transactions with sequence lower than "
- + minStartSeqNumber + ". Still have "
- + commitedTransactionsBySequenceNumber.size() + " left");
- } else if (commitedTransactionsBySequenceNumber.size() > 0) {
- LOG.debug("Could not remove any transactions, and still have "
- + commitedTransactionsBySequenceNumber.size() + " left");
+ if (LOG.isDebugEnabled()) {
+ StringBuilder debugMessage = new StringBuilder();
+ if (numRemoved > 0) {
+ debugMessage.append("Removed ").append(numRemoved).append(
+ " commited transactions");
+
+ if (minStartSeqNumber == Integer.MAX_VALUE) {
+ debugMessage.append("with any sequence number");
+ } else {
+ debugMessage.append("with sequence lower than ").append(
+ minStartSeqNumber).append(".");
+ }
+ if (!commitedTransactionsBySequenceNumber.isEmpty()) {
+ debugMessage.append(" Still have ").append(
+ commitedTransactionsBySequenceNumber.size()).append(" left.");
+ } else {
+ debugMessage.append("None left.");
+ }
+ LOG.debug(debugMessage.toString());
+ } else if (commitedTransactionsBySequenceNumber.size() > 0) {
+ debugMessage.append(
+ "Could not remove any transactions, and still have ").append(
+ commitedTransactionsBySequenceNumber.size()).append(" left");
+ LOG.debug(debugMessage.toString());
+ }
}
}
@@ -647,9 +669,11 @@
/**
* @param transactionId
* @param scanner
+ * @throws UnknownTransactionException
*/
public ScannerWrapper(final long transactionId,
- final InternalScanner scanner) {
+ final InternalScanner scanner) throws UnknownTransactionException {
+
this.transactionId = transactionId;
this.scanner = scanner;
}
@@ -670,10 +694,6 @@
final SortedMap<byte[], Cell> results) throws IOException {
boolean result = scanner.next(key, results);
TransactionState state = getTransactionState(transactionId);
- state.setHasScan(true);
- // FIXME, not using row, just claiming read over the whole region. We are
- // being very conservative on scans to avoid phantom reads.
- state.addRead(key.getRow());
if (result) {
Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,