http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 1fcb314..bb0d1d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -29,14 +29,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -92,6 +87,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final int minVersions; private final long maxRowSize; private final long cellsPerHeartbeatCheck; + private final TimestampType timestampType; // 1) Collects all the KVHeap that are eagerly getting closed during the // course of a scan @@ -175,8 +171,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int numCol = columns == null ? 0 : columns.size(); explicitColumnQuery = numCol > 0; this.scan = scan; - this.now = EnvironmentEdgeManager.currentTime(); - this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); + + if (store != null) { + now = store.getClock().now(); + timestampType = store.getClock().getTimestampType(); + } else { + now = new Clock.System().now(); + timestampType = TimestampType.PHYSICAL; + } + // Convert to milliseconds before subtracting time + long diff = this.timestampType.toEpochTimeMillisFromTimestamp(now) - scanInfo.getTtl(); + // Prevent overflow if diff is negative and timestampType is HYBRID + diff = diff > 0 ? timestampType.fromEpochTimeMillisToTimestamp(diff) : 0L; + this.oldestUnexpiredTS = scan.isRaw() ? 0L : diff; + this.minVersions = scanInfo.getMinVersions(); // We look up row-column Bloom filters for multi-column queries as part of @@ -204,6 +212,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // readType is default if the scan keeps running for a long time. this.scanUsePread = this.readType != Scan.ReadType.STREAM; } + this.preadMaxBytes = scanInfo.getPreadMaxBytes(); this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); // Parallel seeking is on if the config allows and more there is more than one store file. @@ -236,8 +245,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); } - matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, - store.getCoprocessorHost()); + matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, store + .getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -313,12 +322,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // use legacy query matcher since we do not consider the scan object in our code. Only used to // keep compatibility for coprocessor. matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, - store.getCoprocessorHost()); + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, + dropDeletesToRow, store.getCoprocessorHost()); } else { matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, - store.getCoprocessorHost()); + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, + dropDeletesToRow, store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -342,6 +351,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } @VisibleForTesting + StoreScanner(final Store store, final Scan scan, ScanInfo scanInfo, + ScanType scanType, final NavigableSet<byte[]> columns, + final List<KeyValueScanner> scanners) throws IOException { + this(store, scan, scanInfo, scanType, columns, scanners, + HConstants.LATEST_TIMESTAMP, + // 0 is passed as readpoint because the test bypasses Store + 0); + } + + @VisibleForTesting + StoreScanner(final Store store, final Scan scan, ScanInfo scanInfo, + ScanType scanType, final NavigableSet<byte[]> columns, + final List<KeyValueScanner> scanners, long earliestPutTs) + throws IOException { + this(store, scan, scanInfo, scanType, columns, scanners, earliestPutTs, + // 0 is passed as readpoint because the test bypasses Store + 0); + } + + @VisibleForTesting StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs) @@ -351,14 +380,43 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner 0); } - public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, - final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs, + public StoreScanner(final Store store, final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs, long readPt) throws IOException { + this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks(), scanType); + if (scanType == ScanType.USER_SCAN) { + this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, + null); + } else { + if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) + || (scan.getStopRow() != null && scan.getStopRow().length > 0) + || !scan.getTimeRange().isAllTime() || columns != null) { + // use legacy query matcher since we do not consider the scan object in our code. Only used + // to keep compatibility for coprocessor. + matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, + store.getCoprocessorHost()); + } else { + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, + null); + } + } + + // Seek all scanners to the initial key + seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); + addCurrentScanners(scanners); + resetKVHeap(scanners, scanInfo.getComparator()); + } + + public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, + long earliestPutTs, long readPt) throws IOException { this(null, scan, scanInfo, columns, readPt, scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false, scanType); if (scanType == ScanType.USER_SCAN) { this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, - null); + null); } else { if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) || (scan.getStopRow() != null && scan.getStopRow().length > 0) @@ -366,10 +424,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // use legacy query matcher since we do not consider the scan object in our code. Only used // to keep compatibility for coprocessor. matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE, - earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost()); + earliestPutTs, oldestUnexpiredTS, now, null, null, + store.getCoprocessorHost()); } else { this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, - earliestPutTs, oldestUnexpiredTS, now, null, null, null); + earliestPutTs, oldestUnexpiredTS, now, null, null, + null); } } @@ -606,7 +666,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner bytesRead += cellSize; prevCell = cell; topChanged = false; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + ScanQueryMatcher.MatchCode qcode = matcher.match(cell, timestampType); switch (qcode) { case INCLUDE: case INCLUDE_AND_SEEK_NEXT_ROW:
http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java index 89725fe..1100749 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.querymatcher; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -59,12 +60,24 @@ public abstract class DropDeletesCompactionScanQueryMatcher extends CompactionSc this.earliestPutTs = earliestPutTs; } - protected final MatchCode tryDropDelete(Cell cell) { + protected final MatchCode tryDropDelete(Cell cell, TimestampType timestampType) { long timestamp = cell.getTimestamp(); // If it is not the time to drop the delete marker, just return - if (timeToPurgeDeletes > 0 && now - timestamp <= timeToPurgeDeletes) { - return MatchCode.INCLUDE; + if (timeToPurgeDeletes > 0) { + // Assumes now and timestamp should be of same type. It should be the case. + // Else there is something wrong. if it happens in tests, tests should be rewritten. + if (timestampType == TimestampType.HYBRID) { + if (TimestampType.HYBRID.toEpochTimeMillisFromTimestamp(now) - TimestampType.HYBRID + .toEpochTimeMillisFromTimestamp(timestamp) <= timeToPurgeDeletes) { + return MatchCode.INCLUDE; + } + } else { + if (now - timestamp <= timeToPurgeDeletes) { + return MatchCode.INCLUDE; + } + } } + if (keepDeletedCells == KeepDeletedCells.TRUE || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= oldestUnexpiredTS)) { // If keepDeletedCell is true, or the delete marker is not expired yet, we should include it http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java index 11dd51f..272f272 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; @@ -154,7 +155,7 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher { } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { if (filter != null && filter.filterAllRemaining()) { return MatchCode.DONE_SCAN; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java index 67e40ed..af00001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -36,7 +37,7 @@ public class MajorCompactionScanQueryMatcher extends DropDeletesCompactionScanQu } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { MatchCode returnCode = preCheck(cell); if (returnCode != null) { return returnCode; @@ -64,7 +65,7 @@ public class MajorCompactionScanQueryMatcher extends DropDeletesCompactionScanQu return MatchCode.INCLUDE; } trackDelete(cell); - returnCode = tryDropDelete(cell); + returnCode = tryDropDelete(cell, timestampType); if (returnCode != null) { return returnCode; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java index cf36366..dcfba38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -36,7 +37,7 @@ public class MinorCompactionScanQueryMatcher extends CompactionScanQueryMatcher } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { MatchCode returnCode = preCheck(cell); if (returnCode != null) { return returnCode; http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java index d5fda54..c97614c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -57,7 +58,7 @@ public abstract class NormalUserScanQueryMatcher extends UserScanQueryMatcher { } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { if (filter != null && filter.filterAllRemaining()) { return MatchCode.DONE_SCAN; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java index b1f20e2..a6e409c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.querymatcher; import java.io.IOException; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -36,7 +37,7 @@ public abstract class RawScanQueryMatcher extends UserScanQueryMatcher { } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { if (filter != null && filter.filterAllRemaining()) { return MatchCode.DONE_SCAN; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index e508a9a..c38e408 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Timestamp; import org.apache.hadoop.hbase.util.Bytes; /** @@ -153,8 +155,23 @@ public abstract class ScanQueryMatcher implements ShipperListener { long ts = cell.getTimestamp(); assert t.getValueLength() == Bytes.SIZEOF_LONG; long ttl = TagUtil.getValueAsLong(t); - if (ts + ttl < now) { - return true; + if (TimestampType.HYBRID.isLikelyOfType(ts)) { + if (TimestampType.HYBRID.isLikelyOfType(now)) { + if (TimestampType.HYBRID.toEpochTimeMillisFromTimestamp(ts) + ttl < TimestampType.HYBRID + .toEpochTimeMillisFromTimestamp(now)) { + return true; + } + } + else { + if (TimestampType.HYBRID.toEpochTimeMillisFromTimestamp(ts) + ttl < now) { + return true; + } + } + + } else { + if (ts + ttl < now) { + return true; + } } // Per cell TTLs cannot extend lifetime beyond family settings, so // fall through to check that @@ -226,11 +243,12 @@ public abstract class ScanQueryMatcher implements ShipperListener { * <li>got to the next row (MatchCode.DONE)</li> * </ul> * @param cell KeyValue to check + * @param timestampType type of timestamp for the timestamp in cell * @return The match code instance. * @throws IOException in case there is an internal consistency problem caused by a data * corruption. */ - public abstract MatchCode match(Cell cell) throws IOException; + public abstract MatchCode match(Cell cell, TimestampType timestampType) throws IOException; /** * @return the start key http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java index 1ba08f7..ed63c48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -49,7 +50,7 @@ public class StripeCompactionScanQueryMatcher extends DropDeletesCompactionScanQ } @Override - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, TimestampType timestampType) throws IOException { MatchCode returnCode = preCheck(cell); if (returnCode != null) { return returnCode; @@ -64,7 +65,7 @@ public class StripeCompactionScanQueryMatcher extends DropDeletesCompactionScanQ if (dropDeletesInOutput == DropDeletesInOutput.IN) { // here we are running like major compaction trackDelete(cell); - returnCode = tryDropDelete(cell); + returnCode = tryDropDelete(cell, timestampType); if (returnCode != null) { return returnCode; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index e393804..f58e24f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -778,7 +779,14 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS // any cells found there inclusively. long latestTs = Math.max(opTs, latestCellTs); if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) { - latestTs = EnvironmentEdgeManager.currentTime(); + if (latestCellTs == HConstants.LATEST_TIMESTAMP || latestCellTs == 0) { + latestTs = HConstants.LATEST_TIMESTAMP - 1; + } else if (TimestampType.HYBRID.isLikelyOfType(latestCellTs)) { + latestTs = TimestampType.HYBRID.fromEpochTimeMillisToTimestamp(EnvironmentEdgeManager + .currentTime()); + } else { + latestTs = EnvironmentEdgeManager.currentTime(); + } } get.setTimeRange(0, latestTs + 1); // In case of Put operation we set to read all versions. This was done to consider the case http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 24a2f9c..e7e7f90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.compress.Compression; @@ -1872,6 +1873,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return htd; } + public HTableDescriptor createTableDescriptor(final TableName name, + final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted, + ClockType clockType) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); + for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) { + builder.addColumnFamily(new HColumnDescriptor(cfName) + .setMinVersions(minVersions) + .setMaxVersions(versions) + .setKeepDeletedCells(keepDeleted) + .setBlockCacheEnabled(false) + .setTimeToLive(ttl)); + } + builder.setClockType(clockType); + return new HTableDescriptor(builder.build()); + } + /** * Create a table of name <code>name</code>. * @param name Name to give table. @@ -1882,6 +1899,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED); } + /** + * Create a table of name <code>name</code>. + * @param name Name to give table. + * @param clockType clock type of the table + * @return Column descriptor. + */ + public HTableDescriptor createTableDescriptor(final TableName name, ClockType clockType) { + return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS, + MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED, clockType); + } + public HTableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { return createTableDescriptor(tableName, new byte[][] {family}, 1); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index a99345b..9f568f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -253,6 +253,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public Clock getRegionServerClock(ClockType clockType) { + return Clock.getDummyClockOfGivenClockType(clockType); + } + + @Override public ExecutorService getExecutorService() { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java new file mode 100644 index 0000000..228c5df --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.TimestampType; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({MediumTests.class}) +public class TestClockWithCluster { + private static final Log LOG = LogFactory.getLog(TestClockWithCluster.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static Connection connection; + private byte[] columnFamily = Bytes.toBytes("testCF"); + @BeforeClass + public static void setupClass() throws Exception { + UTIL.startMiniCluster(1); + connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownClass() throws Exception { + connection.close(); + UTIL.shutdownMiniCluster(); + } + + private void verifyTimestamps(Table table, final byte[] f, int startRow, int endRow, + TimestampType timestamp) throws IOException { + for (int i = startRow; i < endRow; i++) { + String failMsg = "Failed verification of row :" + i; + byte[] data = Bytes.toBytes(String.valueOf(i)); + Get get = new Get(data); + Result result = table.get(get); + Cell cell = result.getColumnLatestCell(f, null); + assertTrue(failMsg, timestamp.isLikelyOfType(cell.getTimestamp())); + } + } + + @Test + public void testNewTablesAreCreatedWithSystemClock() throws IOException { + Admin admin = connection.getAdmin(); + TableName tableName = TableName.valueOf("TestNewTablesAreSystemByDefault"); + admin.createTable(new HTableDescriptor(tableName).addFamily(new + HColumnDescriptor(columnFamily))); + + Table table = connection.getTable(tableName); + + ClockType clockType = admin.getTableDescriptor(tableName).getClockType(); + assertEquals(ClockType.SYSTEM, clockType); + // write + UTIL.loadNumericRows(table, columnFamily, 0, 1000); + // read , check if the it is same. + UTIL.verifyNumericRows(table, Bytes.toBytes("testCF"), 0, 1000, 0); + + // This check will be useful if Clock type were to be system monotonic or HLC. + verifyTimestamps(table, columnFamily, 0, 1000, TimestampType.PHYSICAL); + } + + @Test + public void testMetaTableClockTypeIsSystem() throws IOException { + Admin admin = connection.getAdmin(); + Table table = connection.getTable(TableName.META_TABLE_NAME); + ClockType clockType = admin.getTableDescriptor(TableName.META_TABLE_NAME).getClockType(); + assertEquals(ClockType.SYSTEM, clockType); + } + + @Test + public void testMetaTableTimestampsAreSystem() throws IOException { + // Checks timestamps of whatever is present in meta table currently. + // ToDo: Include complete meta table sample with all column families to check all paths of + // meta table modification. + Table table = connection.getTable(TableName.META_TABLE_NAME); + Result result = table.getScanner(new Scan()).next(); + for (Cell cell : result.rawCells()) { + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(cell.getTimestamp())); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java index 8805337..c5a9efc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java @@ -29,9 +29,12 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ClockType; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; @@ -45,12 +48,17 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.TestTableName; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * This test runs batch mutation with Increments which have custom TimeRange. @@ -59,14 +67,16 @@ import org.junit.experimental.categories.Category; * See HBASE-15698 */ @Category({CoprocessorTests.class, MediumTests.class}) +@RunWith(Parameterized.class) public class TestIncrementTimeRange { private static final HBaseTestingUtility util = new HBaseTestingUtility(); private static ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); - private static final TableName TEST_TABLE = TableName.valueOf("test"); + @Rule + public TestTableName TEST_TABLE = new TestTableName(); private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); - + private static final byte[][] TEST_FAMILIES = new byte[][]{TEST_FAMILY}; private static final byte[] ROW_A = Bytes.toBytes("aaa"); private static final byte[] ROW_B = Bytes.toBytes("bbb"); private static final byte[] ROW_C = Bytes.toBytes("ccc"); @@ -80,6 +90,18 @@ public class TestIncrementTimeRange { private Table hTableInterface; private Table table; + private ClockType clockType; + + @Parameters(name = "{0}") + public static Iterable<Object> data() { + return Arrays.asList(new Object[] {ClockType.HLC, ClockType.SYSTEM_MONOTONIC, ClockType + .SYSTEM}); + } + + public TestIncrementTimeRange(ClockType clockType) { + this.clockType = clockType; + } + @BeforeClass public static void setupBeforeClass() throws Exception { util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, @@ -98,7 +120,8 @@ public class TestIncrementTimeRange { @Before public void before() throws Exception { - table = util.createTable(TEST_TABLE, TEST_FAMILY); + HTableDescriptor htd = util.createTableDescriptor(TEST_TABLE.getTableName(), clockType); + table = util.createTable(htd, TEST_FAMILIES, new Configuration(HBaseConfiguration.create())); Put puta = new Put(ROW_A); puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1); @@ -121,7 +144,7 @@ public class TestIncrementTimeRange { } } finally { try { - util.deleteTable(TEST_TABLE); + util.deleteTable(TEST_TABLE.getTableName()); } catch (IOException ioe) { } } @@ -150,7 +173,7 @@ public class TestIncrementTimeRange { @Test public void testHTableInterfaceMethods() throws Exception { - hTableInterface = util.getConnection().getTable(TEST_TABLE); + hTableInterface = util.getConnection().getTable(TEST_TABLE.getTableName()); checkHTableInterfaceMethods(); } @@ -162,7 +185,7 @@ public class TestIncrementTimeRange { time = EnvironmentEdgeManager.currentTime(); mee.setValue(time); - TimeRange range10 = new TimeRange(1, time+10); + TimeRange range10 = new TimeRange(1, time + 10); hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 10L) .setTimeRange(range10.getMin(), range10.getMax())); checkRowValue(ROW_A, Bytes.toBytes(11L)); @@ -171,7 +194,7 @@ public class TestIncrementTimeRange { time = EnvironmentEdgeManager.currentTime(); mee.setValue(time); - TimeRange range2 = new TimeRange(1, time+20); + TimeRange range2 = new TimeRange(1, time + 20); List<Row> actions = Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) .setTimeRange(range2.getMin(), range2.getMax()), http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index 0bec03b..2630869 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.constraint.CheckConfigurationConstraint.getConfiguration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -28,12 +29,15 @@ import java.io.PrintStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; @@ -189,13 +193,27 @@ public class TestCopyTable { */ @Test public void testRenameFamily() throws Exception { - final TableName sourceTable = TableName.valueOf(name.getMethodName() + "source"); - final TableName targetTable = TableName.valueOf(name.getMethodName() + "-target"); + testRenameFamily(ClockType.SYSTEM); + testRenameFamily(ClockType.SYSTEM_MONOTONIC); + testRenameFamily(ClockType.HLC); + } + public void testRenameFamily(ClockType clockType) throws Exception { + TableName sourceTable = TableName.valueOf("sourceTable"); + TableName targetTable = TableName.valueOf("targetTable"); + HTableDescriptor sourceTableDesc = new HTableDescriptor(TableDescriptorBuilder + .newBuilder(sourceTable) + .setClockType(clockType) + .build()); + HTableDescriptor targetTableDesc = new HTableDescriptor(TableDescriptorBuilder + .newBuilder(targetTable) + .setClockType(clockType) + .build()); byte[][] families = { FAMILY_A, FAMILY_B }; - - Table t = TEST_UTIL.createTable(sourceTable, families); - Table t2 = TEST_UTIL.createTable(targetTable, families); + Table t = TEST_UTIL.createTable(sourceTableDesc, families, (byte[][]) null, new Configuration + (getConfiguration())); + Table t2 = TEST_UTIL.createTable(targetTableDesc, families, (byte[][]) null, new Configuration + (getConfiguration())); Put p = new Put(ROW1); p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11")); p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12")); @@ -225,6 +243,9 @@ public class TestCopyTable { // Data from the family of B is not copied assertNull(b1); + TEST_UTIL.deleteTable(sourceTable); + TEST_UTIL.deleteTable(targetTable); + } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 7ac7571..089cf69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ClockType; +import org.apache.hadoop.hbase.Clock; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; @@ -579,6 +581,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public Clock getRegionServerClock(ClockType clockType) { + return new Clock.System(); + } + + @Override public ExecutorService getExecutorService() { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index b0eadb5..e5d4326 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -42,8 +42,12 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.Clock; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * compacted memstore test case @@ -166,27 +170,46 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Get tests ////////////////////////////////////////////////////////////////////////////// - /** Test getNextRow from memstore + /** Test getNextRow from memstore with using timestamps from a hybrid logical clock + * @throws InterruptedException + */ + @Test + public void testGetNextRowWithHybridLogicalClock() throws Exception { + testGetNextRow(new Clock.HLC()); + } + + /** Test getNextRow from memstore using timestamps from a system monotonic clock * @throws InterruptedException */ - @Override @Test - public void testGetNextRow() throws Exception { - addRows(this.memstore); + public void testGetNextRowWithSystemMonotonicClock() throws Exception { + testGetNextRow(new Clock.SystemMonotonic()); + } + + /** Test getNextRow from memstore using timestamps from a system clock + * @throws InterruptedException + */ + @Test + public void testGetNextRowWithSystemClock() throws Exception { + testGetNextRow(new Clock.System()); + } + + public void testGetNextRow(Clock clock) throws Exception { + addRows(this.memstore, clock); // Add more versions to make it a little more interesting. Thread.sleep(1); - addRows(this.memstore); + addRows(this.memstore, clock); Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, - new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); + new KeyValue(Bytes.toBytes(0), clock.now())) == 0); for (int i = 0; i < ROW_COUNT; i++) { Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), - System.currentTimeMillis())); + clock.now())); if (i + 1 == ROW_COUNT) { assertEquals(nr, null); } else { assertTrue(CellComparator.COMPARATOR.compareRows(nr, - new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); + new KeyValue(Bytes.toBytes(i + 1), clock.now())) == 0); } } //starting from each row, validate results should contain the starting row @@ -195,9 +218,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; - InternalScanner scanner = new StoreScanner(new Scan( - Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0)); + Store mockStore = mock(HStore.class); + when(mockStore.getClock()).thenReturn(Clock.getDummyClockOfGivenClockType(clock.getClockType())); + InternalScanner scanner = new StoreScanner(mockStore, new Scan(Bytes.toBytes(startRowId)), + scanInfo, scanType, null, memstore.getScanners(0)) ; List<Cell> results = new ArrayList<>(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 439f3d4..06999f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Clock; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -60,6 +61,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; @@ -575,21 +578,41 @@ public class TestDefaultMemStore { // Get tests ////////////////////////////////////////////////////////////////////////////// - /** Test getNextRow from memstore + /** Test getNextRow from memstore with the hybrid logical clock * @throws InterruptedException */ @Test - public void testGetNextRow() throws Exception { - addRows(this.memstore); + public void testGetNextRowWithHybridLogicalClock() throws Exception { + testGetNextRow(new Clock.HLC()); + } + + /** Test getNextRow from memstore with the system monotonic clock + * @throws InterruptedException + */ + @Test + public void testGetNextRowWithSystemMonotonicClock() throws Exception { + testGetNextRow(new Clock.SystemMonotonic()); + } + + /** Test getNextRow from memstore with the system clock + * @throws InterruptedException + */ + @Test + public void testGetNextRowWithSystemClock() throws Exception { + testGetNextRow(new Clock.System()); + } + + public void testGetNextRow(Clock clock) throws Exception { + addRows(this.memstore, clock); // Add more versions to make it a little more interesting. Thread.sleep(1); - addRows(this.memstore); + addRows(this.memstore, clock); Cell closestToEmpty = ((DefaultMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); for (int i = 0; i < ROW_COUNT; i++) { Cell nr = ((DefaultMemStore) this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), - System.currentTimeMillis())); + clock.now())); if (i + 1 == ROW_COUNT) { assertEquals(nr, null); } else { @@ -603,9 +626,11 @@ public class TestDefaultMemStore { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; - try (InternalScanner scanner = new StoreScanner(new Scan( - Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0))) { + Store store = mock(HStore.class); + when(store.getClock()).thenReturn(Clock.getDummyClockOfGivenClockType(clock.getClockType())); + + try (InternalScanner scanner = new StoreScanner(store, new Scan(Bytes.toBytes(startRowId)), + scanInfo, scanType, null, memstore.getScanners(0))) { List<Cell> results = new ArrayList<>(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; @@ -1023,6 +1048,24 @@ public class TestDefaultMemStore { return ROW_COUNT; } + /** + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} + * @param hmc Instance to add rows to. + * @return How many rows we added. + * @throws IOException + */ + protected int addRows(final MemStore hmc, Clock clock) { + for (int i = 0; i < ROW_COUNT; i++) { + long timestamp = clock.now(); + for (int ii = 0; ii < QUALIFIER_COUNT; ii++) { + byte [] row = Bytes.toBytes(i); + byte [] qf = makeQualifier(i, ii); + hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf), null); + } + } + return ROW_COUNT; + } + private long runSnapshot(final AbstractMemStore hmc) throws UnexpectedStateException { // Save off old state. int oldHistorySize = hmc.getSnapshot().getCellsCount(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 569f4f2..7350473 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -77,6 +77,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -112,6 +115,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -6007,20 +6011,34 @@ public class TestHRegion { } @Test - public void testCellTTLs() throws IOException { - IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(edge); + public void testCellTTLsWithHybridLogicalClock() throws IOException { + testCellTTLs(ClockType.HLC); + } + @Test + public void testCellTTLsWithSystemMonotonicClock() throws IOException { + testCellTTLs(ClockType.SYSTEM_MONOTONIC); + } + + @Test + public void testCellTTLsWithSystemClock() throws IOException { + testCellTTLs(ClockType.SYSTEM); + } + + public void testCellTTLs(ClockType clockType) throws IOException { final byte[] row = Bytes.toBytes("testRow"); final byte[] q1 = Bytes.toBytes("q1"); final byte[] q2 = Bytes.toBytes("q2"); final byte[] q3 = Bytes.toBytes("q3"); final byte[] q4 = Bytes.toBytes("q4"); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - HColumnDescriptor hcd = new HColumnDescriptor(fam1); - hcd.setTimeToLive(10); // 10 seconds - htd.addFamily(hcd); + HTableDescriptor htd = new HTableDescriptor(TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(new HColumnDescriptor(fam1) + .setTimeToLive(10)) // 10 seconds + .setClockType(clockType) + .build()); + TimestampType timestampType = clockType.timestampType(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); @@ -6029,22 +6047,32 @@ public class TestHRegion { HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), TEST_UTIL.getDataTestDir(), conf, htd); assertNotNull(region); + + region.setClock(Clock.getDummyClockOfGivenClockType(clockType)); + long now = timestampType.toEpochTimeMillisFromTimestamp(region.getClock().now()); + ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(mee); + mee.setValue(now); + try { - long now = EnvironmentEdgeManager.currentTime(); // Add a cell that will expire in 5 seconds via cell TTL - region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, - HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { + region.put(new Put(row).add(new KeyValue(row, fam1, q1, timestampType + .fromEpochTimeMillisToTimestamp(now), + HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { // TTL tags specify ts in milliseconds new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); // Add a cell that will expire after 10 seconds via family setting - region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q2, timestampType + .fromEpochTimeMillisToTimestamp(now), HConstants.EMPTY_BYTE_ARRAY)); // Add a cell that will expire in 15 seconds via cell TTL - region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, - HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { + region.put(new Put(row).add(new KeyValue(row, fam1, q3, timestampType + .fromEpochTimeMillisToTimestamp(now + 10000 - 1), + HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { // TTL tags specify ts in milliseconds new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); // Add a cell that will expire in 20 seconds via family setting - region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q4, timestampType.fromEpochTimeMillisToTimestamp + (now + 10000 - 1), HConstants.EMPTY_BYTE_ARRAY)); // Flush so we are sure store scanning gets this right region.flush(true); @@ -6057,7 +6085,7 @@ public class TestHRegion { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+5 seconds - edge.incrementTime(5000); + mee.setValue(now + 5001); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6066,7 +6094,7 @@ public class TestHRegion { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+10 seconds - edge.incrementTime(5000); + mee.setValue(now + 10001); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6075,7 +6103,7 @@ public class TestHRegion { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+15 seconds - edge.incrementTime(5000); + mee.setValue(now + 15000); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6084,7 +6112,7 @@ public class TestHRegion { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+20 seconds - edge.incrementTime(10000); + mee.setValue(now + 20000); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6113,7 +6141,13 @@ public class TestHRegion { assertEquals(Bytes.toLong(val), 2L); // Increment time to T+25 seconds - edge.incrementTime(5000); + // For the system and system monotonic clock, the increment operation adds 1 to the timestamp + // so we move must the clock forward an additional millisecond + if (clockType == ClockType.SYSTEM || clockType == ClockType.SYSTEM_MONOTONIC) { + mee.setValue(now + 25002); + } else { + mee.setValue(now + 25001); + } // Value should be back to 1 r = region.get(new Get(row)); @@ -6122,7 +6156,7 @@ public class TestHRegion { assertEquals(Bytes.toLong(val), 1L); // Increment time to T+30 seconds - edge.incrementTime(5000); + mee.setValue(now + 30001); // Original value written at T+20 should be gone now via family TTL r = region.get(new Get(row)); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 85ae459..71bd01b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -60,6 +60,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -171,6 +173,7 @@ public class TestHRegionReplayEvents { when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getConfiguration()).thenReturn(CONF); when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); + when(rss.getRegionServerClock((ClockType)any())).thenReturn(new Clock.System()); String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER .toString(); ExecutorService es = new ExecutorService(string); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java index cad060e..777cf2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java @@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -106,6 +108,7 @@ public class TestRegionSplitPolicy { final List<Region> regions = new ArrayList<>(); Mockito.when(rss.getOnlineRegions(TABLENAME)).thenReturn(regions); Mockito.when(mockRegion.getRegionServerServices()).thenReturn(rss); + Mockito.when(rss.getRegionServerClock(ClockType.SYSTEM)).thenReturn(new Clock.System()); // Set max size for this 'table'. long maxSplitSize = 1024L; htd.setMaxFileSize(maxSplitSize); @@ -167,6 +170,7 @@ public class TestRegionSplitPolicy { Mockito.when(mockRegion.getRegionServerServices()).thenReturn(rss); Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(0L); Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(0L); + Mockito.when(rss.getRegionServerClock(ClockType.SYSTEM)).thenReturn(new Clock.System()); BusyRegionSplitPolicy policy = http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 10f00a6..a832c70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -38,11 +40,14 @@ import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -816,16 +821,31 @@ public class TestStoreScanner { */ @Test public void testWildCardTtlScan() throws IOException { + //testWildCardTtlScan(ClockType.SYSTEM); + //testWildCardTtlScan(ClockType.SYSTEM_MONOTONIC); + testWildCardTtlScan(ClockType.HLC); + } + + public void testWildCardTtlScan(ClockType clockType) throws IOException { long now = System.currentTimeMillis(); + TimestampType timestampType = clockType.timestampType(); KeyValue [] kvs = new KeyValue[] { - KeyValueTestUtil.create("R1", "cf", "a", now-1000, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "b", now-10, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "c", now-200, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R1", "cf", "d", now-10000, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R2", "cf", "a", now, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R2", "cf", "b", now-10, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R2", "cf", "c", now-200, KeyValue.Type.Put, "dont-care"), - KeyValueTestUtil.create("R2", "cf", "c", now-1000, KeyValue.Type.Put, "dont-care") + KeyValueTestUtil.create("R1", "cf", "a", timestampType.fromEpochTimeMillisToTimestamp + (now-1000), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R1", "cf", "b", timestampType.fromEpochTimeMillisToTimestamp + (now-10), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R1", "cf", "c", timestampType.fromEpochTimeMillisToTimestamp + (now-200), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R1", "cf", "d", timestampType.fromEpochTimeMillisToTimestamp + (now-10000), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R2", "cf", "a", timestampType.fromEpochTimeMillisToTimestamp + (now), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R2", "cf", "b", timestampType.fromEpochTimeMillisToTimestamp + (now-10), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R2", "cf", "c", timestampType.fromEpochTimeMillisToTimestamp + (now-200), KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R2", "cf", "c", timestampType.fromEpochTimeMillisToTimestamp + (now-1000), KeyValue.Type.Put, "dont-care") }; List<KeyValueScanner> scanners = scanFixture(kvs); Scan scan = new Scan(); @@ -833,7 +853,9 @@ public class TestStoreScanner { ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; - try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) { + Store store = mock(HStore.class); + when(store.getClock()).thenReturn(Clock.getDummyClockOfGivenClockType(clockType)); + try (StoreScanner scanner = new StoreScanner(store, scan, scanInfo, scanType, null, scanners)) { List<Cell> results = new ArrayList<>(); Assert.assertEquals(true, scanner.next(results)); Assert.assertEquals(2, results.size()); @@ -887,17 +909,32 @@ public class TestStoreScanner { } } + @Test + public void testExpiredDeleteFamilyWithHybridLogicalClock() throws Exception { + testExpiredDeleteFamily(new Clock.HLC()); + } + + @Test + public void testExpiredDeleteFamilyWithSystemMonotonicClock() throws Exception { + testExpiredDeleteFamily(new Clock.SystemMonotonic()); + } + + @Test + public void testExpiredDeleteFamilyWithSystemClock() throws Exception { + testExpiredDeleteFamily(new Clock.System()); + } + /** * Ensure that expired delete family markers don't override valid puts */ - @Test - public void testExpiredDeleteFamily() throws Exception { + public void testExpiredDeleteFamily(Clock clock) throws Exception { long now = System.currentTimeMillis(); + TimestampType timestampType = clock.getTimestampType(); KeyValue [] kvs = new KeyValue[] { - new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, now-1000, - KeyValue.Type.DeleteFamily), - KeyValueTestUtil.create("R1", "cf", "a", now-10, KeyValue.Type.Put, - "dont-care"), + new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, timestampType + .fromEpochTimeMillisToTimestamp(now-1000), KeyValue.Type.DeleteFamily), + KeyValueTestUtil.create("R1", "cf", "a", timestampType.fromEpochTimeMillisToTimestamp + (now-10), KeyValue.Type.Put, "dont-care"), }; List<KeyValueScanner> scanners = scanFixture(kvs); Scan scan = new Scan(); @@ -906,9 +943,10 @@ public class TestStoreScanner { ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; + Store store = mock(HStore.class); + when(store.getClock()).thenReturn(clock); try (StoreScanner scanner = - new StoreScanner(scan, scanInfo, scanType, null, scanners)) { - + new StoreScanner(store, scan, scanInfo, scanType, null, scanners)) { List<Cell> results = new ArrayList<>(); Assert.assertEquals(true, scanner.next(results)); Assert.assertEquals(1, results.size()); @@ -920,9 +958,24 @@ public class TestStoreScanner { } @Test - public void testDeleteMarkerLongevity() throws Exception { + public void testDeleteMarkerLongevityWithHybridLogicalClock() throws Exception { + testDeleteMarkerLongevity(new Clock.HLC()); + } + + @Test + public void testDeleteMarkerLongevityWithSystemMonotonicClock() throws Exception { + testDeleteMarkerLongevity(new Clock.SystemMonotonic()); + } + + @Test + public void testDeleteMarkerLongevityWithSystemClock() throws Exception { + testDeleteMarkerLongevity(new Clock.System()); + } + + public void testDeleteMarkerLongevity(Clock clock) throws Exception { try { final long now = System.currentTimeMillis(); + TimestampType timestampType = clock.getTimestampType(); EnvironmentEdgeManagerTestHelper.injectEdge(new EnvironmentEdge() { public long currentTime() { return now; @@ -930,37 +983,53 @@ public class TestStoreScanner { }); KeyValue[] kvs = new KeyValue[]{ /*0*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, - now - 100, KeyValue.Type.DeleteFamily), // live + timestampType.fromEpochTimeMillisToTimestamp(now - 100), + KeyValue.Type.DeleteFamily), // live /*1*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, - now - 1000, KeyValue.Type.DeleteFamily), // expired - /*2*/ KeyValueTestUtil.create("R1", "cf", "a", now - 50, - KeyValue.Type.Put, "v3"), // live - /*3*/ KeyValueTestUtil.create("R1", "cf", "a", now - 55, - KeyValue.Type.Delete, "dontcare"), // live - /*4*/ KeyValueTestUtil.create("R1", "cf", "a", now - 55, - KeyValue.Type.Put, "deleted-version v2"), // deleted - /*5*/ KeyValueTestUtil.create("R1", "cf", "a", now - 60, - KeyValue.Type.Put, "v1"), // live - /*6*/ KeyValueTestUtil.create("R1", "cf", "a", now - 65, - KeyValue.Type.Put, "v0"), // max-version reached + timestampType.fromEpochTimeMillisToTimestamp(now - 1000), + KeyValue.Type.DeleteFamily), // expired + /*2*/ KeyValueTestUtil.create("R1", "cf", "a", + timestampType.fromEpochTimeMillisToTimestamp(now - 50), + KeyValue.Type.Put, "v3"), // live + /*3*/ KeyValueTestUtil.create("R1", "cf", "a", + timestampType.fromEpochTimeMillisToTimestamp(now - 55), + KeyValue.Type.Delete, "dontcare"), // live + /*4*/ KeyValueTestUtil.create("R1", "cf", "a", + timestampType.fromEpochTimeMillisToTimestamp(now - 55), + KeyValue.Type.Put, "deleted-version v2"), // deleted + /*5*/ KeyValueTestUtil.create("R1", "cf", "a", + timestampType.fromEpochTimeMillisToTimestamp(now - 60), + KeyValue.Type.Put, "v1"), // live + /*6*/ KeyValueTestUtil.create("R1", "cf", "a", + timestampType.fromEpochTimeMillisToTimestamp(now - 65), + KeyValue.Type.Put, "v0"), // max-version reached /*7*/ KeyValueTestUtil.create("R1", "cf", "a", - now - 100, KeyValue.Type.DeleteColumn, "dont-care"), // max-version - /*8*/ KeyValueTestUtil.create("R1", "cf", "b", now - 600, - KeyValue.Type.DeleteColumn, "dont-care"), //expired - /*9*/ KeyValueTestUtil.create("R1", "cf", "b", now - 70, - KeyValue.Type.Put, "v2"), //live - /*10*/ KeyValueTestUtil.create("R1", "cf", "b", now - 750, - KeyValue.Type.Put, "v1"), //expired - /*11*/ KeyValueTestUtil.create("R1", "cf", "c", now - 500, - KeyValue.Type.Delete, "dontcare"), //expired - /*12*/ KeyValueTestUtil.create("R1", "cf", "c", now - 600, - KeyValue.Type.Put, "v1"), //expired - /*13*/ KeyValueTestUtil.create("R1", "cf", "c", now - 1000, - KeyValue.Type.Delete, "dontcare"), //expired - /*14*/ KeyValueTestUtil.create("R1", "cf", "d", now - 60, - KeyValue.Type.Put, "expired put"), //live - /*15*/ KeyValueTestUtil.create("R1", "cf", "d", now - 100, - KeyValue.Type.Delete, "not-expired delete"), //live + timestampType.fromEpochTimeMillisToTimestamp(now - 100), + KeyValue.Type.DeleteColumn, "dont-care"), // max-version + /*8*/ KeyValueTestUtil.create("R1", "cf", "b", + timestampType.fromEpochTimeMillisToTimestamp(now - 600), + KeyValue.Type.DeleteColumn, "dont-care"), //expired + /*9*/ KeyValueTestUtil.create("R1", "cf", "b", + timestampType.fromEpochTimeMillisToTimestamp(now - 70), + KeyValue.Type.Put, "v2"), //live + /*10*/ KeyValueTestUtil.create("R1", "cf", "b", + timestampType.fromEpochTimeMillisToTimestamp(now - 750), + KeyValue.Type.Put, "v1"), //expired + /*11*/ KeyValueTestUtil.create("R1", "cf", "c", + timestampType.fromEpochTimeMillisToTimestamp(now - 500), + KeyValue.Type.Delete, "dontcare"), //expired + /*12*/ KeyValueTestUtil.create("R1", "cf", "c", + timestampType.fromEpochTimeMillisToTimestamp(now - 600), + KeyValue.Type.Put, "v1"), //expired + /*13*/ KeyValueTestUtil.create("R1", "cf", "c", + timestampType.fromEpochTimeMillisToTimestamp(now - 1000), + KeyValue.Type.Delete, "dontcare"), //expired + /*14*/ KeyValueTestUtil.create("R1", "cf", "d", + timestampType.fromEpochTimeMillisToTimestamp(now - 60), + KeyValue.Type.Put, "expired put"), //live + /*15*/ KeyValueTestUtil.create("R1", "cf", "d", + timestampType.fromEpochTimeMillisToTimestamp(now - 100), + KeyValue.Type.Delete, "not-expired delete"), //live }; List<KeyValueScanner> scanners = scanFixture(kvs); Scan scan = new Scan(); @@ -972,10 +1041,10 @@ public class TestStoreScanner { HConstants.DEFAULT_BLOCKSIZE /* block size */, 200, /* timeToPurgeDeletes */ CellComparator.COMPARATOR); - try (StoreScanner scanner = - new StoreScanner(scan, scanInfo, - ScanType.COMPACT_DROP_DELETES, null, scanners, - HConstants.OLDEST_TIMESTAMP)) { + Store store = mock(HStore.class); + when(store.getClock()).thenReturn(clock); + try (StoreScanner scanner = new StoreScanner(store, scan, scanInfo, ScanType + .COMPACT_DROP_DELETES, null, scanners, HConstants.OLDEST_TIMESTAMP)) { List<Cell> results = new ArrayList<>(); results = new ArrayList<>(); Assert.assertEquals(true, scanner.next(results)); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 51260a6..80f8a4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -203,6 +205,7 @@ public class TestWALLockup { Mockito.when(server.isStopped()).thenReturn(false); Mockito.when(server.isAborted()).thenReturn(false); RegionServerServices services = Mockito.mock(RegionServerServices.class); + Mockito.when(services.getRegionServerClock(ClockType.SYSTEM)).thenReturn(new Clock.System()); // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test. FileSystem fs = FileSystem.get(CONF); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java index 73c92e4..35749fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; @@ -86,7 +87,7 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher qm.setToNewRow(KeyValueUtil.createFirstOnRow(row)); prevRow = row; } - actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete))); + actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete), TimestampType.PHYSICAL)); } assertEquals(expected.length, actual.size()); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java index f3cf604..e3e1a63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java @@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Timestamp; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -60,7 +62,7 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { Cell kv = new KeyValue(row1, fam2, col2, 1, data); Cell cell = CellUtil.createLastOnRowCol(kv); qm.setToNewRow(kv); - MatchCode code = qm.match(cell); + MatchCode code = qm.match(cell, TimestampType.PHYSICAL); assertFalse(code.compareTo(MatchCode.SEEK_NEXT_COL) != 0); } @@ -99,7 +101,7 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { qm.setToNewRow(k); for (KeyValue kv : memstore) { - actual.add(qm.match(kv)); + actual.add(qm.match(kv, TimestampType.PHYSICAL)); } assertEquals(expected.size(), actual.size()); @@ -142,7 +144,7 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { qm.setToNewRow(k); for (KeyValue kv : memstore) { - actual.add(qm.match(kv)); + actual.add(qm.match(kv, TimestampType.PHYSICAL)); } assertEquals(expected.size(), actual.size()); @@ -186,7 +188,7 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { List<MatchCode> actual = new ArrayList<>(kvs.length); for (KeyValue kv : kvs) { - actual.add(qm.match(kv)); + actual.add(qm.match(kv, TimestampType.PHYSICAL)); } assertEquals(expected.length, actual.size()); @@ -227,7 +229,7 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { List<ScanQueryMatcher.MatchCode> actual = new ArrayList<>(kvs.length); for (KeyValue kv : kvs) { - actual.add(qm.match(kv)); + actual.add(qm.match(kv, TimestampType.PHYSICAL)); } assertEquals(expected.length, actual.size());
