HBASE-14070 - Core HLC (Sai Teja Ranuva) Rebased by Amit Patel

Core HLC. Adds HLC framework (i.e., Clock, Timestamp API) and tests
Not used by core (yet).

Signed-off-by: Michael Stack <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d53fbc74
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d53fbc74
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d53fbc74

Branch: refs/heads/HBASE-14070.HLC
Commit: d53fbc74ba6e3b4489b82bb48ced6dfe470c8b10
Parents: 2d5a0fb
Author: Amit Patel <[email protected]>
Authored: Thu Jul 6 13:01:23 2017 -0700
Committer: Apekshit Sharma <[email protected]>
Committed: Mon Jul 17 10:56:08 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  10 +
 .../hadoop/hbase/client/TableDescriptor.java    |   9 +
 .../hbase/client/TableDescriptorBuilder.java    |  40 ++
 .../java/org/apache/hadoop/hbase/Clock.java     | 408 +++++++++++++++++++
 .../java/org/apache/hadoop/hbase/ClockType.java |  38 ++
 .../apache/hadoop/hbase/SettableTimestamp.java  |   2 +-
 .../org/apache/hadoop/hbase/TimestampType.java  | 309 ++++++++++++++
 .../java/org/apache/hadoop/hbase/TestClock.java | 293 +++++++++++++
 .../apache/hadoop/hbase/TestTimestampType.java  | 207 ++++++++++
 .../master/procedure/ModifyTableProcedure.java  |   4 +
 .../hadoop/hbase/regionserver/HRegion.java      |  78 +++-
 .../hbase/regionserver/HRegionServer.java       |  24 ++
 .../hadoop/hbase/regionserver/HStore.java       |  11 +-
 .../hadoop/hbase/regionserver/Region.java       |   6 +
 .../regionserver/RegionServerServices.java      |   4 +
 .../apache/hadoop/hbase/regionserver/Store.java |   7 +
 .../hadoop/hbase/regionserver/StoreScanner.java | 102 ++++-
 .../DropDeletesCompactionScanQueryMatcher.java  |  19 +-
 .../querymatcher/LegacyScanQueryMatcher.java    |   3 +-
 .../MajorCompactionScanQueryMatcher.java        |   5 +-
 .../MinorCompactionScanQueryMatcher.java        |   3 +-
 .../NormalUserScanQueryMatcher.java             |   3 +-
 .../querymatcher/RawScanQueryMatcher.java       |   3 +-
 .../querymatcher/ScanQueryMatcher.java          |  24 +-
 .../StripeCompactionScanQueryMatcher.java       |   5 +-
 .../hbase/security/access/AccessController.java |  10 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  28 ++
 .../hadoop/hbase/MockRegionServerServices.java  |   5 +
 .../hadoop/hbase/TestClockWithCluster.java      | 115 ++++++
 .../coprocessor/TestIncrementTimeRange.java     |  39 +-
 .../hadoop/hbase/mapreduce/TestCopyTable.java   |  31 +-
 .../hadoop/hbase/master/MockRegionServer.java   |   7 +
 .../regionserver/TestCompactingMemStore.java    |  46 ++-
 .../hbase/regionserver/TestDefaultMemStore.java |  59 ++-
 .../hadoop/hbase/regionserver/TestHRegion.java  |  74 +++-
 .../regionserver/TestHRegionReplayEvents.java   |   3 +
 .../regionserver/TestRegionSplitPolicy.java     |   4 +
 .../hbase/regionserver/TestStoreScanner.java    | 171 +++++---
 .../hbase/regionserver/TestWALLockup.java       |   3 +
 .../TestCompactionScanQueryMatcher.java         |   3 +-
 .../querymatcher/TestUserScanQueryMatcher.java  |  12 +-
 .../regionserver/wal/AbstractTestWALReplay.java |  47 ++-
 .../access/TestCellACLWithMultipleVersions.java | 180 +++++---
 .../hbase/util/TestCoprocessorScanPolicy.java   |  30 +-
 .../apache/hadoop/hbase/util/TestTableName.java |   5 +-
 45 files changed, 2260 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index c09d434..c865255 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -70,6 +70,7 @@ public class HTableDescriptor implements TableDescriptor, 
Comparable<HTableDescr
   public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 
TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
   public static final int DEFAULT_REGION_REPLICATION = 
TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
   public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = 
TableDescriptorBuilder.DEFAULT_REGION_MEMSTORE_REPLICATION;
+  public static final ClockType DEFAULT_CLOCK_TYPE = 
TableDescriptorBuilder.DEFAULT_CLOCK_TYPE;
   protected final ModifyableTableDescriptor delegatee;
 
   /**
@@ -382,6 +383,15 @@ public class HTableDescriptor implements TableDescriptor, 
Comparable<HTableDescr
   }
 
   /**
+   * Returns the clock type for the table.
+   * @return clock type for the table.
+   */
+  @Override
+  public ClockType getClockType() {
+    return delegatee.getClockType();
+  }
+
+  /**
    * Returns the size of the memstore after which a flush to filesystem is 
triggered.
    *
    * @return memory cache flush size for each hregion, -1 if not set.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 65e64f8..abd6ee9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,6 +24,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.hadoop.hbase.ClockType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -106,6 +108,13 @@ public interface TableDescriptor {
   Durability getDurability();
 
   /**
+   * Returns the clock type setting for the table.
+   *
+   * @return clock type setting for the table.
+   */
+  ClockType getClockType();
+
+  /**
    * Returns an unmodifiable collection of all the {@link 
ColumnFamilyDescriptor} of
    * all the column families of the table.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 44d5c99..2e8d3e9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -36,6 +36,7 @@ import java.util.stream.Stream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClockType;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -117,6 +118,14 @@ public class TableDescriptorBuilder {
           = new Bytes(Bytes.toBytes("DURABILITY"));
 
   /**
+   * {@link ClockType} setting for the table.
+   */
+  @InterfaceAudience.Private
+  static final String CLOCK_TYPE = "CLOCK_TYPE";
+  private static final Bytes CLOCK_TYPE_KEY
+      = new Bytes(Bytes.toBytes(CLOCK_TYPE));
+
+  /**
    * The number of region replicas for the table.
    */
   @InterfaceAudience.Private
@@ -149,6 +158,12 @@ public class TableDescriptorBuilder {
    */
   private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
 
+  /**
+   * Default clock type for HTD is SYSTEM
+   */
+  @InterfaceAudience.Private
+  public static final ClockType DEFAULT_CLOCK_TYPE = ClockType.SYSTEM;
+
   @InterfaceAudience.Private
   public static final String PRIORITY = "PRIORITY";
   private static final Bytes PRIORITY_KEY
@@ -338,6 +353,11 @@ public class TableDescriptorBuilder {
     return this;
   }
 
+  public TableDescriptorBuilder setClockType(ClockType clockType) {
+    desc.setClockType(clockType);
+    return this;
+  }
+
   public TableDescriptorBuilder setFlushPolicyClassName(String clazz) {
     desc.setFlushPolicyClassName(clazz);
     return this;
@@ -694,6 +714,24 @@ public class TableDescriptorBuilder {
     }
 
     /**
+     * Sets the {@link ClockType} for the table. This defaults to 
DEFAULT_CLOCK_TYPE.
+     * @param clockType
+     * @return the modifyable TD
+     */
+    public ModifyableTableDescriptor setClockType(ClockType clockType) {
+      return setValue(CLOCK_TYPE_KEY, clockType.name());
+    }
+
+    /**
+     * Returns the clock type for the table.
+     * @return the clock type for the table.
+     */
+    @Override
+    public ClockType getClockType() {
+      return getOrDefault(CLOCK_TYPE_KEY, ClockType::valueOf, 
DEFAULT_CLOCK_TYPE);
+    }
+
+    /**
      * Get the name of the table
      *
      * @return TableName
@@ -1469,6 +1507,8 @@ public class TableDescriptorBuilder {
     public int getColumnFamilyCount() {
       return families.size();
     }
+
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
new file mode 100644
index 0000000..abc6252
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hbase.util.AtomicUtils.updateMax;
+
+/**
+ * A clock is an implementation of an algorithm to get timestamps 
corresponding to one of the
+ * {@link TimestampType}s for the current time. Different clock 
implementations can have
+ * different semantics associated with them. Every such clock should be able 
to map its
+ * representation of time to one of the {link TimestampType}s.
+ * HBase has traditionally been using the {@link 
java.lang.System#currentTimeMillis()} to
+ * timestamp events in HBase. {@link java.lang.System#currentTimeMillis()} 
does not give any
+ * guarantees about monotonicity of time. We will keep this implementation of 
clock in place for
+ * backward compatibility and call it SYSTEM clock.
+ * It is easy to provide monotonically non decreasing time semantics by 
keeping track of the last
+ * timestamp given by the clock and updating it on receipt of external 
message. This
+ * implementation of clock is called SYSTEM_MONOTONIC.
+ * SYSTEM Clock and SYSTEM_MONOTONIC clock as described above, both being 
physical clocks, they
+ * cannot track causality. Hybrid Logical Clocks(HLC), as described in
+ * <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf";>HLC 
Paper</a>, helps tracking
+ * causality using a
+ * <a 
href="http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf";>Logical
+ * Clock</a> but always keeps the logical time close to the wall time or 
physical time. It kind
+ * of has the advantages of both the worlds. One such advantage being getting 
consistent
+ * snapshots in physical time as described in the paper. Hybrid Logical Clock 
has an additional
+ * advantage that it is always monotonically increasing.
+ * Note: It is assumed that any physical clock implementation has millisecond 
resolution else the
+ * {@link TimestampType} implementation has to changed to accommodate it. It 
is decided after
+ * careful discussion to go with millisecond resolution in the HLC design 
document attached in the
+ * issue <a 
href="https://issues.apache.org/jira/browse/HBASE-14070";>HBASE-14070 </a>.
+ */
+
[email protected]
[email protected]
+public interface Clock {
+  long DEFAULT_MAX_CLOCK_SKEW_IN_MS = 30000;
+
+  /**
+   * This is a method to get the current time.
+   *
+   * @return Timestamp of current time in 64 bit representation corresponding 
to the particular
+   * clock
+   */
+  long now() throws RuntimeException;
+
+  /**
+   * This is a method to update the current time with the passed timestamp.
+   * @param timestamp
+   * @return Timestamp of current time in 64 bit representation corresponding 
to the particular
+   * clock
+   */
+  long update(long timestamp) throws RuntimeException;
+
+  /**
+   * @return true if the clock implementation gives monotonically non 
decreasing timestamps else
+   * false.
+   */
+  boolean isMonotonic();
+
+  /**
+   * @return true if the clock implementation gives monotonically increasing 
timestamps else false.
+   */
+  boolean isMonotonicallyIncreasing();
+
+  /**
+   * @return {@link org.apache.hadoop.hbase.TimestampType}
+   */
+  TimestampType getTimestampType();
+
+  /**
+   * @return {@link org.apache.hadoop.hbase.ClockType}
+   */
+  ClockType getClockType();
+
+
+  /**
+   * Indicates that Physical Time or Logical Time component has overflowed. 
This extends
+   * RuntimeException.
+   */
+  @SuppressWarnings("serial")
+  class ClockException extends RuntimeException {
+    public ClockException(String msg) {
+      super(msg);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////
+  // Physical Clock
+  //////////////////////////////////////////////////////////////////
+
+  interface PhysicalClock {
+    /**
+     * This is a method to get the current time.
+     *
+     * @return Timestamp of current time in 64 bit representation 
corresponding to the particular
+     * clock
+     */
+    long now() throws RuntimeException;
+
+    /**
+     * This is a method to get the unit of the physical time used by the clock
+     *
+     * @return A {@link TimeUnit}
+     */
+    TimeUnit getTimeUnit();
+  }
+
+  class JavaMillisPhysicalClock implements PhysicalClock {
+    @Override
+    public long now() {
+      return EnvironmentEdgeManager.currentTime();
+    }
+
+    @Override
+    public TimeUnit getTimeUnit() {
+      return TimeUnit.MILLISECONDS;
+    }
+  }
+
+  JavaMillisPhysicalClock DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK =
+      new JavaMillisPhysicalClock();
+
+  //////////////////////////////////////////////////////////////////
+  // Implementation of clocks
+  //////////////////////////////////////////////////////////////////
+
+  /**
+   * System clock is an implementation of clock which doesn't give any 
monotonic guarantees.
+   */
+  class System implements Clock, PhysicalClock {
+    private final PhysicalClock physicalClock = 
DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
+    private final ClockType clockType = ClockType.SYSTEM;
+    private final TimestampType timestampType = TimestampType.PHYSICAL;
+
+    @Override
+    public long now() {
+      return physicalClock.now();
+    }
+
+    @Override
+    public long update(long timestamp) {
+      return physicalClock.now();
+    }
+
+    @Override
+    public boolean isMonotonic() {
+      return false;
+    }
+
+    @Override
+    public boolean isMonotonicallyIncreasing() {
+      return false;
+    }
+
+    public TimeUnit getTimeUnit() {
+      return physicalClock.getTimeUnit();
+    }
+
+    @Override
+    public TimestampType getTimestampType() { return timestampType; }
+
+    @Override
+    public ClockType getClockType() { return clockType; }
+  }
+
+  /**
+   * System clock is an implementation of clock which guarantees monotonically 
non-decreasing
+   * timestamps.
+   */
+  class SystemMonotonic implements Clock, PhysicalClock {
+    private final long maxClockSkewInMs;
+    private final PhysicalClock physicalClock;
+    private final AtomicLong physicalTime = new AtomicLong();
+    private final ClockType clockType = ClockType.SYSTEM_MONOTONIC;
+    private final TimestampType timestampType = TimestampType.PHYSICAL;
+
+    public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkewInMs) 
{
+      this.physicalClock = physicalClock;
+      this.maxClockSkewInMs = maxClockSkewInMs > 0 ?
+          maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+    }
+
+    public SystemMonotonic() {
+      this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
+      this.maxClockSkewInMs = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+    }
+
+    @Override
+    public long now() {
+      long systemTime = physicalClock.now();
+      updateMax(physicalTime, systemTime);
+      return physicalTime.get();
+    }
+
+    public long update(long targetTimestamp) throws ClockException {
+      final long systemTime = physicalClock.now();
+      if (maxClockSkewInMs > 0 && (targetTimestamp - systemTime) > 
maxClockSkewInMs) {
+        throw new ClockException(
+            "Received event with timestamp:" + 
getTimestampType().toString(targetTimestamp)
+                + " which is greater than allowed clock skew ");
+      }
+      final long oldPhysicalTime = systemTime > targetTimestamp ? systemTime : 
targetTimestamp;
+      updateMax(physicalTime, oldPhysicalTime);
+      return physicalTime.get();
+    }
+
+    @Override
+    public boolean isMonotonic() {
+      return true;
+    }
+
+    @Override
+    public boolean isMonotonicallyIncreasing() {
+      return false;
+    }
+
+    public TimeUnit getTimeUnit() {
+      return physicalClock.getTimeUnit();
+    }
+
+    @VisibleForTesting
+    void setPhysicalTime(long time) {
+      physicalTime.set(time);
+    }
+
+    @Override
+    public TimestampType getTimestampType() { return timestampType; }
+
+    @Override
+    public ClockType getClockType() { return clockType; }
+  }
+
+  class HLC implements Clock, PhysicalClock {
+    private final PhysicalClock physicalClock;
+    private final long maxClockSkew;
+    private final long maxPhysicalTime;
+    private final long maxLogicalTime;
+    private long physicalTime;
+    private long logicalTime;
+    private final ClockType clockType = ClockType.HLC;
+    private final TimestampType timestampType = TimestampType.HYBRID;
+
+    public HLC(PhysicalClock physicalClock, long maxClockSkew) {
+      this.physicalClock = physicalClock;
+      this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : 
DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+      this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
+      this.maxLogicalTime = timestampType.getMaxLogicalTime();
+      this.physicalTime = 0;
+      this.logicalTime = 0;
+    }
+
+    public HLC() {
+      this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
+      this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+      this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
+      this.maxLogicalTime = timestampType.getMaxLogicalTime();
+      this.physicalTime = 0;
+      this.logicalTime = 0;
+    }
+
+    @Override
+    public synchronized long now() throws ClockException {
+      final long systemTime = physicalClock.now();
+
+      checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
+      checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
+
+      if (systemTime <= physicalTime) {
+        logicalTime++;
+      } else if (systemTime > physicalTime) {
+        logicalTime = 0;
+        physicalTime = systemTime;
+      }
+
+      return toTimestamp();
+    }
+
+    /**
+     * Updates {@link HLC} with the given timestamp received from elsewhere 
(possibly
+     * some other node). Returned timestamp is strict greater than 
msgTimestamp and local
+     * timestamp.
+     *
+     * @param timestamp timestamp from the external message.
+     * @return a hybrid timestamp of HLC that is strictly greater than local 
timestamp and
+     * msgTimestamp
+     * @throws ClockException
+     */
+    @Override
+    public synchronized long update(long timestamp)
+        throws ClockException {
+      final long targetPhysicalTime = timestampType.getPhysicalTime(timestamp);
+      final long targetLogicalTime = timestampType.getLogicalTime(timestamp);
+      final long oldPhysicalTime = physicalTime;
+      final long systemTime = physicalClock.now();
+
+      physicalTime = Math.max(Math.max(oldPhysicalTime, targetPhysicalTime), 
systemTime);
+      checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
+
+      if (targetPhysicalTime - systemTime > maxClockSkew) {
+        throw new ClockException("Received event with timestamp:" +
+            timestampType.toString(timestamp) + " which is greater than 
allowed clock skew");
+      }
+      if (physicalTime == oldPhysicalTime && oldPhysicalTime == 
targetPhysicalTime) {
+        logicalTime = Math.max(logicalTime, targetLogicalTime) + 1;
+      } else if (physicalTime == targetPhysicalTime) {
+        logicalTime = targetLogicalTime + 1;
+      } else if (physicalTime == oldPhysicalTime) {
+        logicalTime++;
+      } else {
+        logicalTime = 0;
+      }
+
+      checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
+      return toTimestamp();
+    }
+
+    @Override
+    public boolean isMonotonic() {
+      return true;
+    }
+
+    @Override
+    public boolean isMonotonicallyIncreasing() {
+      return true;
+    }
+
+    public TimeUnit getTimeUnit() {
+      return physicalClock.getTimeUnit();
+    }
+
+    private long toTimestamp() {
+      return timestampType.toTimestamp(getTimeUnit(), physicalTime, 
logicalTime);
+    }
+
+    @VisibleForTesting
+    synchronized void setLogicalTime(long logicalTime) {
+      this.logicalTime = logicalTime;
+    }
+
+    @VisibleForTesting
+    synchronized void setPhysicalTime(long physicalTime) {
+      this.physicalTime = physicalTime;
+    }
+
+    @Override
+    public TimestampType getTimestampType() { return timestampType; }
+
+    @Override
+    public ClockType getClockType() { return clockType; }
+  }
+
+  //////////////////////////////////////////////////////////////////
+  // Utility functions
+  //////////////////////////////////////////////////////////////////
+
+  // Only for testing.
+  @VisibleForTesting
+  static Clock getDummyClockOfGivenClockType(ClockType clockType) {
+    if (clockType == ClockType.HLC) {
+      return new Clock.HLC();
+    } else if (clockType == ClockType.SYSTEM_MONOTONIC) {
+      return new Clock.SystemMonotonic();
+    } else {
+      return new Clock.System();
+    }
+  }
+
+  static void checkLogicalTimeOverflow(long logicalTime, long maxLogicalTime) {
+    if (logicalTime >= maxLogicalTime) {
+      // highly unlikely to happen, when it happens, we throw exception for 
the above layer to
+      // handle it the way they wish to.
+      throw new Clock.ClockException(
+          "Logical Time Overflowed: " + logicalTime + "max " + "logical time: 
" + maxLogicalTime);
+    }
+  }
+
+  static void checkPhysicalTimeOverflow(long physicalTime, long 
maxPhysicalTime) {
+    if (physicalTime >= maxPhysicalTime) {
+      // Extremely unlikely to happen, if this happens upper layers may have 
to kill the server.
+      throw new Clock.ClockException(
+          "Physical Time overflowed: " + physicalTime + " and max physical 
time:" + maxPhysicalTime);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
new file mode 100644
index 0000000..aee8e43
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
[email protected]
+public enum ClockType {
+  SYSTEM{
+    public TimestampType timestampType() {
+      return TimestampType.PHYSICAL;
+    }
+  }, SYSTEM_MONOTONIC {
+    public TimestampType timestampType() {
+      return TimestampType.PHYSICAL;
+    }
+  }, HLC {
+    public TimestampType timestampType() {
+      return TimestampType.HYBRID;
+    }
+  };
+  abstract public TimestampType timestampType();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
index 8637db2..41d3a0a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
@@ -26,8 +26,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
  * Note : Server side Cell implementations in write path must implement this.
  * @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} 
instead
  */
+@Deprecated // Co Processors SHOULD NOT use this if the clock type of the 
tables is HLC
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@Deprecated
 public interface SettableTimestamp {
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
new file mode 100644
index 0000000..e2e2604
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.commons.lang.time.FastDateFormat;
+
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TimestampType} is an enum to represent different ways of encoding 
time in HBase using
+ * 64 bits. Time is usually encoded as a 64-bit long in {@link 
org.apache.hadoop.hbase.Cell}
+ * timestamps and is used for sorting {@link org.apache.hadoop.hbase.Cell}s, 
ordering writes etc.
+ * It has methods which help in constructing or interpreting the 64 bit 
timestamp and getter
+ * methods to read the hard coded constants of the particular {@link 
TimestampType}.
+ *
+ * <p>
+ * Enum {@link TimestampType} is dumb in a way. It doesn't have any logic 
other than interpreting
+ * the 64 bits. Any monotonically increasing or monotonically non-decreasing 
semantics of the
+ * timestamps are the responsibility of the clock implementation generating 
the particular
+ * timestamps. There can be several clock implementations, and each such 
implementation can map
+ * its representation of the timestamp to one of the available Timestamp types 
i.e.
+ * {@link #HYBRID} or {@link #PHYSICAL}. In essence, the {@link TimestampType} 
is only used
+ * internally by the Clock implementations and thus never exposed to the user. 
The user has to
+ * know only the different available clock types. So, for the user, timestamp 
types do not exist.
+ * </p>
+ */
[email protected]
[email protected]
+public enum TimestampType {
+  /**
+   * Hybrid is a Timestamp type used to encode both physical time and logical 
time components
+   * into a single. 64 bits long integer. It has methods to decipher the 64 
bits hybrid timestamp
+   * and also to construct the hybrid timestamp.
+   */
+  HYBRID {
+    /**
+     * Hard coded 44-bits for physical time, with most significant bit 
carrying the sign i.e 0
+     * as we are dealing with positive integers and the remaining 43 bits are 
to be interpreted as
+     * system time in milli seconds. See
+     * <a href="https://issues.apache.org/jira/browse/HBASE-14070";>HBASE-14070 
</a> for
+     * understanding the choice of going with the millisecond resolution for 
physical time.
+     * Thus allowing us to represent all the dates between unix epoch (1970) 
and year 2248 with
+     * signed timestamp comparison with 44 bits for physical time assuming a 
millisecond
+     * resolution with signed long integers. Picking 42 bits to represent the 
physical time has
+     * the problem of representing time until 2039 only, with signed integers, 
might cause Y2k39
+     * bug hoping HBase to be around till then. The trade-off here is with the 
year until we can
+     * represent the physical time vs if we are able capture all the events in 
the worst case
+     * (read: leap seconds etc) without the logical component of the timestamp 
overflowing. With
+     * 20 bits for logical time, one can represent upto one million events at 
the same
+     * millisecond. In case of leap seconds, the no of events happening in the 
same second is very
+     * unlikely to exceed one million.
+     */
+    @SuppressWarnings("unused")
+    private static final int BITS_FOR_PHYSICAL_TIME = 44;
+
+    /**
+     * Remaining 20-bits for logical time, allowing values up to 1,048,576. 
Logical Time is the
+     * least significant part of the 64 bit timestamp, so unsigned comparison 
can be used for LT.
+     */
+
+    private static final int BITS_FOR_LOGICAL_TIME = 20;
+
+    /**
+     * Max value for physical time in the {@link #HYBRID} timestamp 
representation, inclusive.
+     * This assumes signed comparison.
+     */
+    private static final long PHYSICAL_TIME_MAX_VALUE = 0x7ffffffffffL;
+
+    /**
+     * Max value for logical time in the {@link #HYBRID} timestamp 
representation
+     */
+    static final long LOGICAL_TIME_MAX_VALUE = 0xfffffL;
+
+    public long toEpochTimeMillisFromTimestamp(long timestamp) {
+      return getPhysicalTime(timestamp);
+    }
+
+    public long fromEpochTimeMillisToTimestamp(long timestamp) {
+      return toTimestamp(TimeUnit.MILLISECONDS, timestamp, 0);
+    }
+
+    public long toTimestamp(TimeUnit timeUnit, long physicalTime, long 
logicalTime) {
+      physicalTime = TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit);
+      return (physicalTime << BITS_FOR_LOGICAL_TIME) + logicalTime;
+    }
+
+    public long getPhysicalTime(long timestamp) {
+      return timestamp >>> BITS_FOR_LOGICAL_TIME; // assume unsigned timestamp
+    }
+
+    long getLogicalTime(long timestamp) {
+      return timestamp & LOGICAL_TIME_MAX_VALUE;
+    }
+
+    public long getMaxPhysicalTime() {
+      return PHYSICAL_TIME_MAX_VALUE;
+    }
+
+    public long getMaxLogicalTime() {
+      return LOGICAL_TIME_MAX_VALUE;
+    }
+
+    int getBitsForLogicalTime() {
+      return BITS_FOR_LOGICAL_TIME;
+    }
+
+    /**
+     * Returns whether the given timestamp is "likely" of {@link #HYBRID} 
{@link TimestampType}.
+     * Timestamp implementations can use the full range of 64bits long to 
represent physical and
+     * logical components of time. However, this method returns whether the 
given timestamp is a
+     * likely representation depending on heuristics for the clock 
implementation.
+     *
+     * Hybrid timestamps are checked whether they belong to Hybrid range 
assuming
+     * that Hybrid timestamps will only have > 0 logical time component for 
timestamps
+     * corresponding to years after 2016. This method will return false if lt 
> 0 and year is
+     * before 2016. Due to left shifting for Hybrid time, all 
millisecond-since-epoch timestamps
+     * from years 1970-10K fall into
+     * year 1970 when interpreted as Hybrid timestamps. Thus, {@link 
#isLikelyOfType(long)} will
+     * return false for timestamps which are in the year 1970 and logical time 
= 0 when
+     * interpreted as of type Hybrid Time.
+     *
+     * <p>
+     *   <b>Note that </b> this method uses heuristics which may not hold
+     * if system timestamps are intermixed from client side and server side or 
timestamp
+     * sources other than system clock are used.
+     * </p>
+     * @param timestamp {@link #HYBRID} Timestamp
+     * @return true if the timestamp is likely to be of the corresponding 
{@link TimestampType}
+     * else false
+     */
+    public boolean isLikelyOfType(long timestamp) {
+      final long physicalTime = getPhysicalTime(timestamp);
+      final long logicalTime = getLogicalTime(timestamp);
+
+      // heuristic 1: Up until year 2016 (1451635200000), lt component cannot 
be non-zero.
+      if (physicalTime < 1451635200000L && logicalTime != 0) {
+        return false;
+      } else if (physicalTime < 31536000000L) {
+        // heuristic 2: Even if logical time = 0, physical time after left 
shifting by 20 bits,
+        // will be before year 1971(31536000000L), as after left shifting by 
20, all epoch ms
+        // timestamps from wall time end up in year less than 1971, even for 
epoch time for the
+        // year 10000. This assumes Hybrid time is not used to represent 
timestamps for year 1970
+        // UTC.
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * Returns a string representation for Physical Time and Logical Time 
components. The format is:
+     * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time),Logical Time</code>
+     * Physical Time is converted to UTC time and not to local time for 
uniformity.
+     * Example: 2015-07-17 16:56:35:891(1437177395891), 0
+     * @param timestamp A {@link #HYBRID} Timestamp
+     * @return A date time string formatted as mentioned in the method 
description
+     */
+    public String toString(long timestamp) {
+      long physicalTime = getPhysicalTime(timestamp);
+      long logicalTime = getLogicalTime(timestamp);
+      return new 
StringBuilder().append(dateFormat.format(physicalTime)).append("(")
+          .append(physicalTime).append(")").append(", 
").append(logicalTime).toString();
+    }
+  },
+
+  /**
+   * Physical is a Timestamp type used to encode the physical time in 64 bits.
+   * It has helper methods to decipher the 64 bit encoding of physical time.
+   */
+  PHYSICAL {
+    public long toEpochTimeMillisFromTimestamp(long timestamp) {
+      return timestamp;
+    }
+
+    public long fromEpochTimeMillisToTimestamp(long timestamp) {
+      return timestamp;
+    }
+
+    public long toTimestamp(TimeUnit timeUnit, long physicalTime, long 
logicalTime) {
+      return TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit);
+    }
+
+    public long getPhysicalTime(long timestamp) {
+      return timestamp;
+    }
+
+    long getLogicalTime(long timestamp) {
+      return 0;
+    }
+
+    public long getMaxPhysicalTime() {
+      return Long.MAX_VALUE;
+    }
+
+    public long getMaxLogicalTime() {
+      return 0;
+    }
+
+    int getBitsForLogicalTime() {
+      return 0;
+    }
+
+    public boolean isLikelyOfType(long timestamp) {
+      // heuristic: the timestamp should be up to year 3K (32503680000000L).
+      return timestamp < 32503680000000L;
+    }
+
+    /**
+     * Returns a string representation for Physical Time and Logical Time 
components. The format is:
+     * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time)</code>
+     * Physical Time is converted to UTC time and not to local time for 
uniformity.
+     * Example: 2015-07-17 16:56:35:891(1437177395891), 0
+     * @param timestamp epoch time in milliseconds
+     * @return A date time string formatted as mentioned in the method 
description
+     */
+    public String toString(long timestamp) {
+      return new 
StringBuilder().append(dateFormat.format(timestamp)).append("(")
+          .append(timestamp).append(")").append(", ").append("0").toString();
+    }
+  };
+
+  /**
+   * This is used internally by the enum methods of Hybrid and Physical 
Timestamp types to
+   * convert the
+   * timestamp to the format set here. UTC timezone instead of local time zone 
for convenience
+   * and uniformity
+   */
+  private static final FastDateFormat dateFormat =
+      FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss:SSS", 
TimeZone.getTimeZone("UTC"));
+
+  /**
+   * Converts the given timestamp to the unix epoch timestamp with millisecond 
resolution.
+   * Returned timestamp is compatible with System.currentTimeMillis().
+   * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+   * @return number of milliseconds from epoch
+   */
+  abstract public long toEpochTimeMillisFromTimestamp(long timestamp);
+
+  /**
+   * Converts the given time in milliseconds to the corresponding {@link 
TimestampType}
+   * representation.
+   * @param timeInMillis epoch time in {@link TimeUnit#MILLISECONDS}
+   * @return a timestamp representation corresponding to {@link TimestampType}.
+   */
+  abstract public long fromEpochTimeMillisToTimestamp(long timeInMillis);
+
+  /**
+   * Converts the given physical clock in the given {@link TimeUnit} to a 
64-bit timestamp
+   * @param timeUnit a time unit as in the enum {@link TimeUnit}
+   * @param physicalTime physical time
+   * @param logicalTime logical time
+   * @return a timestamp in 64 bits
+   */
+  abstract public long toTimestamp(TimeUnit timeUnit, long physicalTime, long 
logicalTime);
+
+  /**
+   * Extracts and returns the physical time from the timestamp
+   * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+   * @return physical time in {@link TimeUnit#MILLISECONDS}
+   */
+  abstract public long getPhysicalTime(long timestamp);
+
+  /**
+   * Extracts and returns the logical time from the timestamp
+   * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+   * @return logical time
+   */
+  abstract long getLogicalTime(long timestamp);
+
+  /**
+   * @return the maximum possible physical time in {@link 
TimeUnit#MILLISECONDS}
+   */
+  abstract public long getMaxPhysicalTime();
+
+  /**
+   * @return the maximum possible logical time
+   */
+  abstract public long getMaxLogicalTime();
+
+  /**
+   * @return number of least significant bits allocated for logical time
+   */
+  abstract int getBitsForLogicalTime();
+
+  /**
+   * @param timestamp epoch time in milliseconds
+   * @return True if the timestamp generated by the clock is of type {@link 
#PHYSICAL} else False
+   */
+  abstract public boolean isLikelyOfType(long timestamp);
+
+  public abstract String toString(long timestamp);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
new file mode 100644
index 0000000..2476c51
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestClock {
+
+  static final Clock.PhysicalClock PHYSICAL_CLOCK = 
mock(Clock.PhysicalClock.class);
+  static final long MAX_CLOCK_SKEW_IN_MS = 1000;
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Wrapper class around any Clock.
+   * On calls to now() and update(), timestamps returned by the underlying 
clock are stored in an
+   * array. Call assertMonotonic() to assert that the timestamps returned were 
monotonic or not.
+   */
+  private class MonotonicityCheckerClock {
+    final boolean strictlyIncreasing;
+    final Clock clock;
+    final ArrayList<Long> timestamps = new ArrayList<>();
+
+    MonotonicityCheckerClock(Clock clock, boolean strictlyIncreasing) {
+      this.clock = clock;
+      this.strictlyIncreasing = strictlyIncreasing;
+    }
+
+    long now() {
+      long ts = clock.now();
+      timestamps.add(ts);
+      return ts;
+    }
+
+    long update(long timestamp) {
+      long ts = clock.update(timestamp);
+      timestamps.add(ts);
+      return ts;
+    }
+
+    void assertMonotonic() {
+      assertTrue(timestamps.size() > 0);
+      long prev = 0;
+      for (long timestamp : timestamps) {
+        if (strictlyIncreasing) {
+          // This simple comparison works correctly for all types of clocks we 
have currently.
+          assertTrue(timestamps.toString(), timestamp > prev);
+        } else {
+          assertTrue(timestamps.toString(), timestamp >= prev);
+        }
+        prev = timestamp;
+      }
+    }
+  }
+
+  @Before
+  public void setUp() {
+    when(PHYSICAL_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+  }
+
+  // All Clocks Tests
+
+  @Test
+  public void testSystemMonotonicNow() {
+    MonotonicityCheckerClock systemMonotonic =
+        new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 
30000), false);
+
+    // case 1: Set time and assert
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertEquals(100, systemMonotonic.now());
+
+    // case 2: Go back in time and check monotonic property.
+    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    assertEquals(100, systemMonotonic.now());
+
+    // case 3: system time goes ahead compared to previous timestamp.
+    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    assertEquals(101, systemMonotonic.now());
+
+    systemMonotonic.assertMonotonic();
+  }
+
+  @Test
+  public void testSystemMonotonicUpdate() {
+    MonotonicityCheckerClock systemMonotonic =
+        new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 
30000), false);
+
+    // Sets internal time
+    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    assertEquals(99, systemMonotonic.now());
+
+    // case 1: Message timestamp is greater than current System Monotonic Time,
+    // physical time at 100 still.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertEquals(102, systemMonotonic.update(102));
+
+    // case 2: Message timestamp is greater than current System Monotonic Time,
+    // physical time at 100 still.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertEquals(103, systemMonotonic.update(103));
+
+    // case 3: Message timestamp is less than current System Monotonic Time, 
greater than current
+    // physical time which is 100.
+    assertEquals(103, systemMonotonic.update(101));
+
+    // case 4: Message timestamp is less than current System Monotonic Time, 
less than current
+    // physical time which is 100.
+    assertEquals(103, systemMonotonic.update(99));
+
+    // case 5: Message timestamp<System monotonic time and both less than 
current Physical Time
+    when(PHYSICAL_CLOCK.now()).thenReturn(106L);
+    assertEquals(106, systemMonotonic.update(102));
+
+    // case 6: Message timestamp>System monotonic time and both less than 
current Physical Time
+    when(PHYSICAL_CLOCK.now()).thenReturn(109L);
+    assertEquals(109, systemMonotonic.update(108));
+
+    systemMonotonic.assertMonotonic();
+  }
+
+  @Test
+  public void testSystemMonotonicUpdateMaxClockSkew() throws 
Clock.ClockException {
+    final long time = 100L;
+    Clock.SystemMonotonic systemMonotonic =
+        new Clock.SystemMonotonic(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS);
+
+    // Set Current Time.
+    when(PHYSICAL_CLOCK.now()).thenReturn(time);
+    systemMonotonic.now();
+
+    // Shouldn't throw ClockException
+    systemMonotonic.update(time + MAX_CLOCK_SKEW_IN_MS - 1);
+
+    exception.expect(Clock.ClockException.class);
+    systemMonotonic.update(time + MAX_CLOCK_SKEW_IN_MS + 1);
+  }
+
+  // All Hybrid Logical Clock Tests
+
+  private void assertHLCTime(long hlcTime, long expectedPhysicalTime, long 
expectedLogicalTime) {
+    assertEquals(expectedPhysicalTime, 
TimestampType.HYBRID.getPhysicalTime(hlcTime));
+    assertEquals(expectedLogicalTime, 
TimestampType.HYBRID.getLogicalTime(hlcTime));
+  }
+
+  @Test
+  public void testHLCNow() throws Clock.ClockException {
+    MonotonicityCheckerClock hybridLogicalClock =
+        new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 30000), 
true);
+
+    // case 1: Test if it returns correct time based on current physical time.
+    //         Remember, initially logical time = 0
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertHLCTime(hybridLogicalClock.now(), 100, 0);
+
+    // case 2: physical time doesn't change, logical time should increment.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertHLCTime(hybridLogicalClock.now(), 100, 1);
+
+    // case 3: physical time doesn't change still, logical time should 
increment again
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    assertHLCTime(hybridLogicalClock.now(), 100, 2);
+
+    // case 4: physical time moves forward, logical time should reset to 0.
+    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    assertHLCTime(hybridLogicalClock.now(), 101, 0);
+
+    // case 5: Monotonic increasing check, physical time goes back.
+    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    assertHLCTime(hybridLogicalClock.now(), 101, 1);
+
+    hybridLogicalClock.assertMonotonic();
+  }
+
+  @Test
+  public void testHLCNowLogicalTimeOverFlow() {
+    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
+
+    // Set Current Time.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    hybridLogicalClock.setPhysicalTime(100);
+    
hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime());
+
+    exception.expect(Clock.ClockException.class);
+    hybridLogicalClock.now();
+  }
+
+  @Test
+  public void testHLCUpdate() throws Clock.ClockException {
+    long messageTimestamp;
+    MonotonicityCheckerClock hybridLogicalClock =
+        new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 100), true);
+
+    // Set Current Time.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    hybridLogicalClock.now();
+
+    // case 1: Message physical timestamp is lower than current physical time.
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
99, 1);
+    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 101, 0);
+
+    // case 2: Message physical timestamp is greater than HLC physical time.
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
105 , 3);
+    when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 4);
+
+    // case 3: Message timestamp is less than HLC timestamp
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
104 , 4);
+    when(PHYSICAL_CLOCK.now()).thenReturn(103L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 5);
+
+    //case 4: Message timestamp with same physical time as HLC, but lower 
logical time
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
105 , 2);
+    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 6);
+
+    //case 5: Message timestamp with same physical time as HLC, but higher 
logical time
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
105 , 8);
+    when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 9);
+
+    //case 6: Actual Physical Time greater than message physical timestamp and 
HLC physical time.
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
105 , 10);
+    when(PHYSICAL_CLOCK.now()).thenReturn(110L);
+    assertHLCTime(hybridLogicalClock.update(messageTimestamp), 110, 0);
+
+    hybridLogicalClock.assertMonotonic();
+  }
+
+  @Test
+  public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException {
+    long messageTimestamp;
+    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
+
+    // Set Current Time.
+    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    hybridLogicalClock.now();
+
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 
100,
+        TimestampType.HYBRID.getMaxLogicalTime());
+    exception.expect(Clock.ClockException.class);
+    hybridLogicalClock.update(messageTimestamp);
+  }
+
+  @Test
+  public void testHLCUpdateMaxClockSkew() throws Clock.ClockException {
+    final long time = 100;
+    long messageTimestamp;
+    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 
MAX_CLOCK_SKEW_IN_MS);
+
+    // Set Current Time.
+    when(PHYSICAL_CLOCK.now()).thenReturn(time);
+    hybridLogicalClock.now();
+
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+        time + MAX_CLOCK_SKEW_IN_MS - 1, 0);
+    hybridLogicalClock.update(messageTimestamp);
+
+    messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+        time + MAX_CLOCK_SKEW_IN_MS + 1, 0);
+    exception.expect(Clock.ClockException.class);
+    hybridLogicalClock.update(messageTimestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
new file mode 100644
index 0000000..b1a661b
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for TimestampType enum.
+ */
+@Category(SmallTests.class)
+public class TestTimestampType {
+  private static final Log LOG = LogFactory.getLog(TestTimestampType.class);
+
+  private static final long PHYSICAL_TIME = 1234567890123L;
+  private static final long LOGICAL_TIME = 12;
+
+  private static final long HLC_TIME = TimestampType.HYBRID.toTimestamp(
+      TimeUnit.MILLISECONDS, PHYSICAL_TIME, LOGICAL_TIME);
+  private static final long PHYSICAL_CLOCK_TIME = 
TimestampType.PHYSICAL.toTimestamp(
+      TimeUnit.MILLISECONDS, PHYSICAL_TIME, LOGICAL_TIME);
+
+  @Test
+  public void testFromToEpoch() {
+    for (TimestampType timestamp : TimestampType.values()) {
+      long wallTime = System.currentTimeMillis();
+      long converted = timestamp.toEpochTimeMillisFromTimestamp(
+          timestamp.fromEpochTimeMillisToTimestamp(wallTime));
+
+      assertEquals(wallTime, converted);
+    }
+  }
+
+  /* Tests for HLC Clock */
+  @Test
+  public void testHybridMaxValues() {
+    // assert 44-bit Physical Time with signed comparison (actual 43 bits)
+    assertEquals(
+        (1L << (63-TimestampType.HYBRID.getBitsForLogicalTime())) - 1,
+        TimestampType.HYBRID.getMaxPhysicalTime());
+
+    // assert 20-bit Logical Time
+    assertEquals(
+        (1L << TimestampType.HYBRID.getBitsForLogicalTime()) - 1,
+        TimestampType.HYBRID.getMaxLogicalTime());
+
+    // assert that maximum representable timestamp is Long.MAX_VALUE (assuming 
signed comparison).
+    assertEquals(
+        Long.MAX_VALUE,
+        TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+            TimestampType.HYBRID.getMaxPhysicalTime(),
+            TimestampType.HYBRID.getMaxLogicalTime())
+    );
+  }
+
+  @Test
+  public void testHybridTimeComponents() {
+    assertEquals(PHYSICAL_TIME, 
TimestampType.HYBRID.getPhysicalTime(HLC_TIME));
+    assertEquals(LOGICAL_TIME, TimestampType.HYBRID.getLogicalTime(HLC_TIME));
+  }
+
+  @Test
+  public void testHybridToString() {
+    assertEquals("2009-02-13T23:31:30:123(1234567890123), 12",
+        TimestampType.HYBRID.toString(HLC_TIME));
+  }
+
+  @Test
+  public void testHybridToTimestamp() {
+    long expected = (PHYSICAL_TIME << 
TimestampType.HYBRID.getBitsForLogicalTime()) + LOGICAL_TIME;
+    // test millisecond
+    assertEquals(HLC_TIME, expected);
+
+    // test nanosecond
+    long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.NANOSECONDS,
+        TimeUnit.MILLISECONDS.toNanos(PHYSICAL_TIME), LOGICAL_TIME);
+    assertEquals(ts, expected);
+  }
+
+  @Test
+  public void testHybridIsLikelyOfType() throws ParseException {
+    ZonedDateTime date = LocalDateTime.of(1970, 1, 1, 1, 
1).atZone(ZoneId.of("UTC"));
+    // test timestamps of Hybrid type from year 1971 to 2248 where lt = 0
+    for (int year = 1971; year <= 2248; year += 1) {
+      date = date.withYear(year);
+      // Hybrid type ts with pt = date and lt = 0
+      long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, 
date.toEpochSecond(), 0);
+      assertTrue("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts));
+    }
+
+    // test timestamps of Hybrid type from year 2016 to 2348 where lt > 0
+    for (int year = 2016; year <= 2248; year += 1) {
+      date = date.withYear(year);
+
+      // Hybrid type ts with pt = date and lt = 123
+      long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, 
date.toEpochSecond(), 123);
+      assertTrue("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts));
+    }
+
+    // test that timestamps from different years are not Hybrid type
+    for (int year = 1970; year <= 10000 ;year += 10) {
+      date = date.withYear(year);
+      final long ts = date.toEpochSecond() * 1000;
+      assertFalse("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts));
+    }
+
+    // test that timestamps up to 2016 are not Hybrid even if lt = 0
+    for (int year = 1970; year <= 2016; year += 1) {
+      date = date.withYear(year);
+
+      // reset lt = 0
+      long ts = (((date.toEpochSecond() * 1000)
+          >> TimestampType.HYBRID.getBitsForLogicalTime())
+          << TimestampType.HYBRID.getBitsForLogicalTime());
+      assertFalse("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts));
+    }
+
+    // test that timestamps from currentTime epoch are not Hybrid type
+    long systemTimeNow = System.currentTimeMillis();
+    LOG.info(TimestampType.PHYSICAL.toString(systemTimeNow));
+    
LOG.info(TimestampType.PHYSICAL.toString((TimestampType.HYBRID.getPhysicalTime(systemTimeNow))));
+    assertFalse(TimestampType.HYBRID.isLikelyOfType(systemTimeNow));
+  }
+
+
+  @Test
+  public void testPhysicalMaxValues() {
+    assertEquals( (1L << 63)- 1, TimestampType.PHYSICAL.getMaxPhysicalTime());
+
+    assertEquals(0, TimestampType.PHYSICAL.getMaxLogicalTime());
+  }
+
+  @Test
+  public void testPhysicalClockPhysicalAndLogicalTime() {
+    assertEquals(PHYSICAL_TIME, 
TimestampType.PHYSICAL.getPhysicalTime(PHYSICAL_CLOCK_TIME));
+    assertEquals(0, 
TimestampType.PHYSICAL.getLogicalTime(PHYSICAL_CLOCK_TIME));
+  }
+
+  @Test
+  public void testPhysicalToString() {
+    assertEquals("2009-02-13T23:31:30:123(1234567890123), 0",
+        TimestampType.PHYSICAL.toString(PHYSICAL_CLOCK_TIME));
+  }
+
+  @Test
+  public void testPhysicalToTimestamp() {
+    // test millisecond
+    assertEquals(PHYSICAL_CLOCK_TIME, PHYSICAL_TIME);
+
+    // test nanosecond
+    long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.NANOSECONDS,
+        TimeUnit.MILLISECONDS.toNanos(PHYSICAL_TIME), LOGICAL_TIME);
+    assertEquals(ts, PHYSICAL_TIME);
+  }
+
+  @Test
+  public void testPhysicalIsLikelyOfType() throws ParseException {
+    final ZonedDateTime date = LocalDateTime.of(1970, 1, 1, 1, 
1).atZone(ZoneId.of("UTC"));
+
+    // test that timestamps from 1970 to 3K epoch are of Physical type
+    for (int year = 1970; year < 3000 ;year += 10) {
+      // Start date 1970 to 10000
+      date.withYear(year);
+      final long ts = date.toEpochSecond() * 1000;
+      assertTrue("Year = " + year, TimestampType.PHYSICAL.isLikelyOfType(ts));
+    }
+
+    // test that timestamps from currentTime epoch are of Physical type
+    long systemTimeNow = System.currentTimeMillis();
+    assertTrue(TimestampType.PHYSICAL.isLikelyOfType(systemTimeNow));
+
+    // test timestamps of Hybrid type from year 1970 to 2248 are not of 
Physical type
+    for (int year = 1970; year <= 2248; year += 1) {
+      date.withYear(year);
+      // Hybrid type ts with pt = date and lt = 0
+      long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, 
date.toEpochSecond(), 0);
+      assertFalse(TimestampType.PHYSICAL.isLikelyOfType(ts));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 20a6a03..6d42c06 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -256,6 +256,10 @@ public class ModifyTableProcedure
         throw new IOException("REGION_REPLICATION change is not supported for 
enabled tables");
       }
     }
+    // do not allow changing of clock type.
+    if (modifiedHTableDescriptor.getClockType() != 
unmodifiedHTableDescriptor.getClockType()) {
+      throw new IOException("Clock Type change is not supported for tables");
+    }
 
     // Find out whether all column families in unmodifiedHTableDescriptor also 
exists in
     // the modifiedHTableDescriptor. This is to determine whether we are safe 
to rollback.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b02b042..a55be97 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ClockType;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -100,6 +102,8 @@ import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagUtil;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.TimestampType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
@@ -380,6 +384,24 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return minimumReadPoint;
   }
 
+  @Override
+  public Clock getClock() {
+    if (this.clock == null) {
+      return this.getRegionServerServices().getRegionServerClock(
+          getTableDescriptor().getClockType());
+    }
+    return this.clock;
+  }
+
+  /**
+   * Only for the purpose of testing
+   * @param clock
+   */
+  @VisibleForTesting
+  public void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
   /*
    * Data structure of write state flags used coordinating flushes,
    * compactions and closes.
@@ -617,6 +639,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new 
ConcurrentHashMap<>();
 
   final RegionServerServices rsServices;
+  private Clock clock;
   private RegionServerAccounting rsAccounting;
   private long flushCheckInterval;
   // flushPerChanges is to prevent too many changes in memstore
@@ -775,6 +798,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         ? DEFAULT_DURABILITY
         : htd.getDurability();
     if (rsServices != null) {
+      this.clock = rsServices.getRegionServerClock(htd.getClockType());
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
       // TODO: revisit if coprocessors should load in other cases
@@ -789,6 +813,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         recoveringRegions.put(encodedName, this);
       }
     } else {
+      this.clock = new Clock.System();
       this.metricsRegionWrapper = null;
       this.metricsRegion = null;
     }
@@ -2796,8 +2821,31 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return getScanner(scan, additionalScanners, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
   }
 
+  /**
+   * Clients use physical timestamps when setting time ranges. Tables that use 
HLCs must map the
+   * physical timestamp to HLC time
+   */
+  private void mapTimeRangesWithRespectToClock(Scan scan) {
+    TimeRange tr = scan.getTimeRange();
+    if (tr.isAllTime()) {
+      return;
+    }
+    TimestampType timestampType = getClock().getTimestampType();
+    // Clip time range max to prevent overflow when converting from epoch time 
to timestamp time
+    long trMaxClipped = Math.min(tr.getMax(), 
timestampType.getMaxPhysicalTime());
+    try {
+      
scan.setTimeRange(timestampType.fromEpochTimeMillisToTimestamp(tr.getMin()),
+          timestampType.fromEpochTimeMillisToTimestamp(trMaxClipped));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
   private RegionScanner getScanner(Scan scan, List<KeyValueScanner> 
additionalScanners,
       long nonceGroup, long nonce) throws IOException {
+    if (getClock().getClockType() == ClockType.HLC) {
+      mapTimeRangesWithRespectToClock(scan);
+    }
     startRegionOperation(Operation.SCAN);
     try {
       // Verify families are all valid
@@ -3219,7 +3267,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     try {
       // STEP 1. Try to acquire as many locks as we can, and ensure we acquire 
at least one.
       int numReadyToWrite = 0;
-      long now = EnvironmentEdgeManager.currentTime();
+      long now = clock.now();
       while (lastIndexExclusive < batchOp.operations.length) {
         if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, 
observedExceptions)) {
           lastIndexExclusive++;
@@ -3257,7 +3305,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       // STEP 2. Update any LATEST_TIMESTAMP timestamps
       // We should record the timestamp only after we have acquired the 
rowLock,
       // otherwise, newer puts/deletes are not guaranteed to have a newer 
timestamp
-      now = EnvironmentEdgeManager.currentTime();
+      now = clock.now();
       byte[] byteNow = Bytes.toBytes(now);
 
       // Nothing to put/delete -- an exception in the above such as 
NoSuchColumnFamily?
@@ -3756,8 +3804,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           // non-decreasing (see HBASE-14070) we should make sure that the 
mutation has a
           // larger timestamp than what was observed via Get. doBatchMutate 
already does this, but
           // there is no way to pass the cellTs. See HBASE-14054.
-          long now = EnvironmentEdgeManager.currentTime();
-          long ts = Math.max(now, cellTs); // ensure write is not eclipsed
+          long now = clock.now();
+          long ts = clock.isMonotonic() ? now : Math.max(now, cellTs); // 
ensure write is not eclipsed
           byte[] byteTs = Bytes.toBytes(ts);
           if (mutation != null) {
             if (mutation instanceof Put) {
@@ -4038,7 +4086,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
       return;
     }
-    long maxTs = now + timestampSlop;
+    long maxTs = clock.getTimestampType().getPhysicalTime(now) + timestampSlop;
     for (List<Cell> kvs : familyMap.values()) {
       assert kvs instanceof RandomAccess;
       int listSize  = kvs.size();
@@ -7101,7 +7149,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // Short circuit the read only case
     if (processor.readOnly()) {
       try {
-        long now = EnvironmentEdgeManager.currentTime();
+        long now = clock.now();
         doProcessRowWithTimeout(processor, now, this, null, null, timeout);
         processor.postProcess(this, walEdit, true);
       } finally {
@@ -7131,7 +7179,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         // STEP 3. Region lock
         lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : 
acquiredRowLocks.size());
         locked = true;
-        long now = EnvironmentEdgeManager.currentTime();
+        long now = clock.now();
         // STEP 4. Let the processor scan the rows, generate mutations and add 
waledits
         doProcessRowWithTimeout(processor, now, this, mutations, walEdit, 
timeout);
         if (!mutations.isEmpty()) {
@@ -7440,7 +7488,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       final List<Cell> results)
   throws IOException {
     WALEdit walEdit = null;
-    long now = EnvironmentEdgeManager.currentTime();
+    long now = clock.now();
     final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
     // Process a Store/family at a time.
     for (Map.Entry<byte [], List<Cell>> entry: 
mutation.getFamilyCellMap().entrySet()) {
@@ -7556,7 +7604,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     long ts = now;
     if (currentValue != null) {
       tags = TagUtil.carryForwardTags(tags, currentValue);
-      ts = Math.max(now, currentValue.getTimestamp() + 1);
+      // Ensure that timestamp of increment operation is greater than existing 
cell timestamp
+      if (this.getClock().getClockType() == ClockType.SYSTEM ||
+          this.getClock().getClockType() == ClockType.SYSTEM_MONOTONIC) {
+        ts = Math.max(now, currentValue.getTimestamp() + 1);
+      }
       newValue += getLongValue(currentValue);
     }
     // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how 
KeyValues are made...
@@ -7582,7 +7634,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     byte [] row = mutation.getRow();
     if (currentValue != null) {
       tags = TagUtil.carryForwardTags(tags, currentValue);
-      ts = Math.max(now, currentValue.getTimestamp() + 1);
+      // Ensure that timestamp of increment operation is greater than existing 
cell timestamp
+      if (this.getClock().getClockType() == ClockType.SYSTEM ||
+          this.getClock().getClockType() == ClockType.SYSTEM_MONOTONIC) {
+        ts = Math.max(now, currentValue.getTimestamp() + 1);
+      }
       tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
       byte[] tagBytes = TagUtil.fromList(tags);
       // Allocate an empty cell and copy in all parts.
@@ -7685,7 +7741,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
       (15 * Bytes.SIZEOF_LONG) +
       6 * Bytes.SIZEOF_BOOLEAN);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 986d6d4..dc7c70e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.ClockType;
 import org.apache.hadoop.hbase.HealthCheckChore;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -331,6 +333,13 @@ public class HRegionServer extends HasThread implements
   // debugging and unit tests.
   private volatile boolean abortRequested;
 
+  // Region server contains instances of all three clock clocks. Regions have 
a set
+  // clock type so depending on the clock type needed by a region, the 
appropriate
+  // one can be accessed.
+  final protected Clock hybridLogicalClock;
+  final protected Clock systemMonotonicClock;
+  final protected Clock systemClock;
+
   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>();
 
   // A state before we go into stopped state.  At this stage we're closing user
@@ -588,6 +597,10 @@ public class HRegionServer extends HasThread implements
     this.abortRequested = false;
     this.stopped = false;
 
+    this.hybridLogicalClock = new Clock.HLC();
+    this.systemMonotonicClock = new Clock.SystemMonotonic();
+    this.systemClock = new Clock.System();
+
     rpcServices = createRpcServices();
     this.startcode = System.currentTimeMillis();
     if (this instanceof HMaster) {
@@ -2074,6 +2087,17 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
+  public Clock getRegionServerClock(ClockType clockType) {
+    if (clockType.equals(ClockType.HLC)){
+      return this.hybridLogicalClock;
+    } else if (clockType.equals(ClockType.SYSTEM_MONOTONIC)) {
+      return this.systemMonotonicClock;
+    } else {
+      return this.systemClock;
+    }
+  }
+
+  @Override
   public Connection getConnection() {
     return getClusterConnection();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c914ab6..4b2b460 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.hadoop.hbase.TimestampType;
+import org.apache.hadoop.hbase.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -341,10 +343,10 @@ public class HStore implements Store {
 
   /**
    * @param family
-   * @return TTL in seconds of the specified family
+   * @return TTL in milli seconds of the specified family
    */
   public static long determineTTLFromFamily(final ColumnFamilyDescriptor 
family) {
-    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
+    // HColumnDescriptor.getTimeToLive returns ttl in seconds.  Convert to 
milliseconds.
     long ttl = family.getTimeToLive();
     if (ttl == HConstants.FOREVER) {
       // Default is unlimited ttl.
@@ -403,6 +405,11 @@ public class HStore implements Store {
   }
 
   @Override
+  public Clock getClock() {
+    return region.getClock();
+  }
+
+  @Override
   @Deprecated
   public long getSnapshotSize() {
     MemstoreSize size = getSizeOfSnapshot();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 4c188fe..ea50943 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Clock;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@@ -82,6 +83,11 @@ public interface Region extends ConfigurationObserver {
   /** @return table descriptor for this region */
   TableDescriptor getTableDescriptor();
 
+  /** @return clock of the Region Server corresponding the clock type used by 
the
+   *  table contained in this region.
+   */
+  Clock getClock();
+
   /** @return true if region is available (not closed and not closing) */
   boolean isAvailable();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 5afa652..5c37136 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -39,6 +39,8 @@ import 
org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.ClockType;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.protobuf.Service;
@@ -58,6 +60,8 @@ public interface RegionServerServices extends OnlineRegions, 
FavoredNodesForRegi
    * default (common) WAL */
   WAL getWAL(HRegionInfo regionInfo) throws IOException;
 
+  Clock getRegionServerClock(ClockType clockType);
+
   /** @return the List of WALs that are used by this server
    *  Doesn't include the meta WAL
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index fd9de9b..82edc7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Clock;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -346,6 +347,12 @@ public interface Store extends HeapSize, 
StoreConfigInformation, PropagatingConf
   MemstoreSize getSizeToFlush();
 
   /**
+   * @return clock of the Region Server corresponding the clock type used by 
the
+   *  table referred to by this store.
+   */
+  Clock getClock();
+
+  /**
    * Returns the memstore snapshot size
    * @return size of the memstore snapshot
    * @deprecated Since 2.0 and will be removed in 3.0. Use {@link 
#getSizeOfSnapshot()} instead.

Reply via email to