HBASE-14070 - Core HLC (Sai Teja Ranuva) Rebased by Amit Patel Core HLC. Adds HLC framework (i.e., Clock, Timestamp API) and tests Not used by core (yet).
Signed-off-by: Michael Stack <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d53fbc74 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d53fbc74 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d53fbc74 Branch: refs/heads/HBASE-14070.HLC Commit: d53fbc74ba6e3b4489b82bb48ced6dfe470c8b10 Parents: 2d5a0fb Author: Amit Patel <[email protected]> Authored: Thu Jul 6 13:01:23 2017 -0700 Committer: Apekshit Sharma <[email protected]> Committed: Mon Jul 17 10:56:08 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 10 + .../hadoop/hbase/client/TableDescriptor.java | 9 + .../hbase/client/TableDescriptorBuilder.java | 40 ++ .../java/org/apache/hadoop/hbase/Clock.java | 408 +++++++++++++++++++ .../java/org/apache/hadoop/hbase/ClockType.java | 38 ++ .../apache/hadoop/hbase/SettableTimestamp.java | 2 +- .../org/apache/hadoop/hbase/TimestampType.java | 309 ++++++++++++++ .../java/org/apache/hadoop/hbase/TestClock.java | 293 +++++++++++++ .../apache/hadoop/hbase/TestTimestampType.java | 207 ++++++++++ .../master/procedure/ModifyTableProcedure.java | 4 + .../hadoop/hbase/regionserver/HRegion.java | 78 +++- .../hbase/regionserver/HRegionServer.java | 24 ++ .../hadoop/hbase/regionserver/HStore.java | 11 +- .../hadoop/hbase/regionserver/Region.java | 6 + .../regionserver/RegionServerServices.java | 4 + .../apache/hadoop/hbase/regionserver/Store.java | 7 + .../hadoop/hbase/regionserver/StoreScanner.java | 102 ++++- .../DropDeletesCompactionScanQueryMatcher.java | 19 +- .../querymatcher/LegacyScanQueryMatcher.java | 3 +- .../MajorCompactionScanQueryMatcher.java | 5 +- .../MinorCompactionScanQueryMatcher.java | 3 +- .../NormalUserScanQueryMatcher.java | 3 +- .../querymatcher/RawScanQueryMatcher.java | 3 +- .../querymatcher/ScanQueryMatcher.java | 24 +- .../StripeCompactionScanQueryMatcher.java | 5 +- .../hbase/security/access/AccessController.java | 10 +- .../hadoop/hbase/HBaseTestingUtility.java | 28 ++ .../hadoop/hbase/MockRegionServerServices.java | 5 + .../hadoop/hbase/TestClockWithCluster.java | 115 ++++++ .../coprocessor/TestIncrementTimeRange.java | 39 +- .../hadoop/hbase/mapreduce/TestCopyTable.java | 31 +- .../hadoop/hbase/master/MockRegionServer.java | 7 + .../regionserver/TestCompactingMemStore.java | 46 ++- .../hbase/regionserver/TestDefaultMemStore.java | 59 ++- .../hadoop/hbase/regionserver/TestHRegion.java | 74 +++- .../regionserver/TestHRegionReplayEvents.java | 3 + .../regionserver/TestRegionSplitPolicy.java | 4 + .../hbase/regionserver/TestStoreScanner.java | 171 +++++--- .../hbase/regionserver/TestWALLockup.java | 3 + .../TestCompactionScanQueryMatcher.java | 3 +- .../querymatcher/TestUserScanQueryMatcher.java | 12 +- .../regionserver/wal/AbstractTestWALReplay.java | 47 ++- .../access/TestCellACLWithMultipleVersions.java | 180 +++++--- .../hbase/util/TestCoprocessorScanPolicy.java | 30 +- .../apache/hadoop/hbase/util/TestTableName.java | 5 +- 45 files changed, 2260 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index c09d434..c865255 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -70,6 +70,7 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE; public static final int DEFAULT_REGION_REPLICATION = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = TableDescriptorBuilder.DEFAULT_REGION_MEMSTORE_REPLICATION; + public static final ClockType DEFAULT_CLOCK_TYPE = TableDescriptorBuilder.DEFAULT_CLOCK_TYPE; protected final ModifyableTableDescriptor delegatee; /** @@ -382,6 +383,15 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr } /** + * Returns the clock type for the table. + * @return clock type for the table. + */ + @Override + public ClockType getClockType() { + return delegatee.getClockType(); + } + + /** * Returns the size of the memstore after which a flush to filesystem is triggered. * * @return memory cache flush size for each hregion, -1 if not set. http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 65e64f8..abd6ee9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -24,6 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Set; + +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; @@ -106,6 +108,13 @@ public interface TableDescriptor { Durability getDurability(); /** + * Returns the clock type setting for the table. + * + * @return clock type setting for the table. + */ + ClockType getClockType(); + + /** * Returns an unmodifiable collection of all the {@link ColumnFamilyDescriptor} of * all the column families of the table. * http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 44d5c99..2e8d3e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -117,6 +118,14 @@ public class TableDescriptorBuilder { = new Bytes(Bytes.toBytes("DURABILITY")); /** + * {@link ClockType} setting for the table. + */ + @InterfaceAudience.Private + static final String CLOCK_TYPE = "CLOCK_TYPE"; + private static final Bytes CLOCK_TYPE_KEY + = new Bytes(Bytes.toBytes(CLOCK_TYPE)); + + /** * The number of region replicas for the table. */ @InterfaceAudience.Private @@ -149,6 +158,12 @@ public class TableDescriptorBuilder { */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; + /** + * Default clock type for HTD is SYSTEM + */ + @InterfaceAudience.Private + public static final ClockType DEFAULT_CLOCK_TYPE = ClockType.SYSTEM; + @InterfaceAudience.Private public static final String PRIORITY = "PRIORITY"; private static final Bytes PRIORITY_KEY @@ -338,6 +353,11 @@ public class TableDescriptorBuilder { return this; } + public TableDescriptorBuilder setClockType(ClockType clockType) { + desc.setClockType(clockType); + return this; + } + public TableDescriptorBuilder setFlushPolicyClassName(String clazz) { desc.setFlushPolicyClassName(clazz); return this; @@ -694,6 +714,24 @@ public class TableDescriptorBuilder { } /** + * Sets the {@link ClockType} for the table. This defaults to DEFAULT_CLOCK_TYPE. + * @param clockType + * @return the modifyable TD + */ + public ModifyableTableDescriptor setClockType(ClockType clockType) { + return setValue(CLOCK_TYPE_KEY, clockType.name()); + } + + /** + * Returns the clock type for the table. + * @return the clock type for the table. + */ + @Override + public ClockType getClockType() { + return getOrDefault(CLOCK_TYPE_KEY, ClockType::valueOf, DEFAULT_CLOCK_TYPE); + } + + /** * Get the name of the table * * @return TableName @@ -1469,6 +1507,8 @@ public class TableDescriptorBuilder { public int getColumnFamilyCount() { return families.size(); } + + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java new file mode 100644 index 0000000..abc6252 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hbase.util.AtomicUtils.updateMax; + +/** + * A clock is an implementation of an algorithm to get timestamps corresponding to one of the + * {@link TimestampType}s for the current time. Different clock implementations can have + * different semantics associated with them. Every such clock should be able to map its + * representation of time to one of the {link TimestampType}s. + * HBase has traditionally been using the {@link java.lang.System#currentTimeMillis()} to + * timestamp events in HBase. {@link java.lang.System#currentTimeMillis()} does not give any + * guarantees about monotonicity of time. We will keep this implementation of clock in place for + * backward compatibility and call it SYSTEM clock. + * It is easy to provide monotonically non decreasing time semantics by keeping track of the last + * timestamp given by the clock and updating it on receipt of external message. This + * implementation of clock is called SYSTEM_MONOTONIC. + * SYSTEM Clock and SYSTEM_MONOTONIC clock as described above, both being physical clocks, they + * cannot track causality. Hybrid Logical Clocks(HLC), as described in + * <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf">HLC Paper</a>, helps tracking + * causality using a + * <a href="http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf">Logical + * Clock</a> but always keeps the logical time close to the wall time or physical time. It kind + * of has the advantages of both the worlds. One such advantage being getting consistent + * snapshots in physical time as described in the paper. Hybrid Logical Clock has an additional + * advantage that it is always monotonically increasing. + * Note: It is assumed that any physical clock implementation has millisecond resolution else the + * {@link TimestampType} implementation has to changed to accommodate it. It is decided after + * careful discussion to go with millisecond resolution in the HLC design document attached in the + * issue <a href="https://issues.apache.org/jira/browse/HBASE-14070">HBASE-14070 </a>. + */ + [email protected] [email protected] +public interface Clock { + long DEFAULT_MAX_CLOCK_SKEW_IN_MS = 30000; + + /** + * This is a method to get the current time. + * + * @return Timestamp of current time in 64 bit representation corresponding to the particular + * clock + */ + long now() throws RuntimeException; + + /** + * This is a method to update the current time with the passed timestamp. + * @param timestamp + * @return Timestamp of current time in 64 bit representation corresponding to the particular + * clock + */ + long update(long timestamp) throws RuntimeException; + + /** + * @return true if the clock implementation gives monotonically non decreasing timestamps else + * false. + */ + boolean isMonotonic(); + + /** + * @return true if the clock implementation gives monotonically increasing timestamps else false. + */ + boolean isMonotonicallyIncreasing(); + + /** + * @return {@link org.apache.hadoop.hbase.TimestampType} + */ + TimestampType getTimestampType(); + + /** + * @return {@link org.apache.hadoop.hbase.ClockType} + */ + ClockType getClockType(); + + + /** + * Indicates that Physical Time or Logical Time component has overflowed. This extends + * RuntimeException. + */ + @SuppressWarnings("serial") + class ClockException extends RuntimeException { + public ClockException(String msg) { + super(msg); + } + } + + ////////////////////////////////////////////////////////////////// + // Physical Clock + ////////////////////////////////////////////////////////////////// + + interface PhysicalClock { + /** + * This is a method to get the current time. + * + * @return Timestamp of current time in 64 bit representation corresponding to the particular + * clock + */ + long now() throws RuntimeException; + + /** + * This is a method to get the unit of the physical time used by the clock + * + * @return A {@link TimeUnit} + */ + TimeUnit getTimeUnit(); + } + + class JavaMillisPhysicalClock implements PhysicalClock { + @Override + public long now() { + return EnvironmentEdgeManager.currentTime(); + } + + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.MILLISECONDS; + } + } + + JavaMillisPhysicalClock DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK = + new JavaMillisPhysicalClock(); + + ////////////////////////////////////////////////////////////////// + // Implementation of clocks + ////////////////////////////////////////////////////////////////// + + /** + * System clock is an implementation of clock which doesn't give any monotonic guarantees. + */ + class System implements Clock, PhysicalClock { + private final PhysicalClock physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; + private final ClockType clockType = ClockType.SYSTEM; + private final TimestampType timestampType = TimestampType.PHYSICAL; + + @Override + public long now() { + return physicalClock.now(); + } + + @Override + public long update(long timestamp) { + return physicalClock.now(); + } + + @Override + public boolean isMonotonic() { + return false; + } + + @Override + public boolean isMonotonicallyIncreasing() { + return false; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + + @Override + public TimestampType getTimestampType() { return timestampType; } + + @Override + public ClockType getClockType() { return clockType; } + } + + /** + * System clock is an implementation of clock which guarantees monotonically non-decreasing + * timestamps. + */ + class SystemMonotonic implements Clock, PhysicalClock { + private final long maxClockSkewInMs; + private final PhysicalClock physicalClock; + private final AtomicLong physicalTime = new AtomicLong(); + private final ClockType clockType = ClockType.SYSTEM_MONOTONIC; + private final TimestampType timestampType = TimestampType.PHYSICAL; + + public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkewInMs) { + this.physicalClock = physicalClock; + this.maxClockSkewInMs = maxClockSkewInMs > 0 ? + maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS; + } + + public SystemMonotonic() { + this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; + this.maxClockSkewInMs = DEFAULT_MAX_CLOCK_SKEW_IN_MS; + } + + @Override + public long now() { + long systemTime = physicalClock.now(); + updateMax(physicalTime, systemTime); + return physicalTime.get(); + } + + public long update(long targetTimestamp) throws ClockException { + final long systemTime = physicalClock.now(); + if (maxClockSkewInMs > 0 && (targetTimestamp - systemTime) > maxClockSkewInMs) { + throw new ClockException( + "Received event with timestamp:" + getTimestampType().toString(targetTimestamp) + + " which is greater than allowed clock skew "); + } + final long oldPhysicalTime = systemTime > targetTimestamp ? systemTime : targetTimestamp; + updateMax(physicalTime, oldPhysicalTime); + return physicalTime.get(); + } + + @Override + public boolean isMonotonic() { + return true; + } + + @Override + public boolean isMonotonicallyIncreasing() { + return false; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + + @VisibleForTesting + void setPhysicalTime(long time) { + physicalTime.set(time); + } + + @Override + public TimestampType getTimestampType() { return timestampType; } + + @Override + public ClockType getClockType() { return clockType; } + } + + class HLC implements Clock, PhysicalClock { + private final PhysicalClock physicalClock; + private final long maxClockSkew; + private final long maxPhysicalTime; + private final long maxLogicalTime; + private long physicalTime; + private long logicalTime; + private final ClockType clockType = ClockType.HLC; + private final TimestampType timestampType = TimestampType.HYBRID; + + public HLC(PhysicalClock physicalClock, long maxClockSkew) { + this.physicalClock = physicalClock; + this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW_IN_MS; + this.maxPhysicalTime = timestampType.getMaxPhysicalTime(); + this.maxLogicalTime = timestampType.getMaxLogicalTime(); + this.physicalTime = 0; + this.logicalTime = 0; + } + + public HLC() { + this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK; + this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW_IN_MS; + this.maxPhysicalTime = timestampType.getMaxPhysicalTime(); + this.maxLogicalTime = timestampType.getMaxLogicalTime(); + this.physicalTime = 0; + this.logicalTime = 0; + } + + @Override + public synchronized long now() throws ClockException { + final long systemTime = physicalClock.now(); + + checkPhysicalTimeOverflow(systemTime, maxPhysicalTime); + checkLogicalTimeOverflow(logicalTime, maxLogicalTime); + + if (systemTime <= physicalTime) { + logicalTime++; + } else if (systemTime > physicalTime) { + logicalTime = 0; + physicalTime = systemTime; + } + + return toTimestamp(); + } + + /** + * Updates {@link HLC} with the given timestamp received from elsewhere (possibly + * some other node). Returned timestamp is strict greater than msgTimestamp and local + * timestamp. + * + * @param timestamp timestamp from the external message. + * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and + * msgTimestamp + * @throws ClockException + */ + @Override + public synchronized long update(long timestamp) + throws ClockException { + final long targetPhysicalTime = timestampType.getPhysicalTime(timestamp); + final long targetLogicalTime = timestampType.getLogicalTime(timestamp); + final long oldPhysicalTime = physicalTime; + final long systemTime = physicalClock.now(); + + physicalTime = Math.max(Math.max(oldPhysicalTime, targetPhysicalTime), systemTime); + checkPhysicalTimeOverflow(systemTime, maxPhysicalTime); + + if (targetPhysicalTime - systemTime > maxClockSkew) { + throw new ClockException("Received event with timestamp:" + + timestampType.toString(timestamp) + " which is greater than allowed clock skew"); + } + if (physicalTime == oldPhysicalTime && oldPhysicalTime == targetPhysicalTime) { + logicalTime = Math.max(logicalTime, targetLogicalTime) + 1; + } else if (physicalTime == targetPhysicalTime) { + logicalTime = targetLogicalTime + 1; + } else if (physicalTime == oldPhysicalTime) { + logicalTime++; + } else { + logicalTime = 0; + } + + checkLogicalTimeOverflow(logicalTime, maxLogicalTime); + return toTimestamp(); + } + + @Override + public boolean isMonotonic() { + return true; + } + + @Override + public boolean isMonotonicallyIncreasing() { + return true; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + + private long toTimestamp() { + return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime); + } + + @VisibleForTesting + synchronized void setLogicalTime(long logicalTime) { + this.logicalTime = logicalTime; + } + + @VisibleForTesting + synchronized void setPhysicalTime(long physicalTime) { + this.physicalTime = physicalTime; + } + + @Override + public TimestampType getTimestampType() { return timestampType; } + + @Override + public ClockType getClockType() { return clockType; } + } + + ////////////////////////////////////////////////////////////////// + // Utility functions + ////////////////////////////////////////////////////////////////// + + // Only for testing. + @VisibleForTesting + static Clock getDummyClockOfGivenClockType(ClockType clockType) { + if (clockType == ClockType.HLC) { + return new Clock.HLC(); + } else if (clockType == ClockType.SYSTEM_MONOTONIC) { + return new Clock.SystemMonotonic(); + } else { + return new Clock.System(); + } + } + + static void checkLogicalTimeOverflow(long logicalTime, long maxLogicalTime) { + if (logicalTime >= maxLogicalTime) { + // highly unlikely to happen, when it happens, we throw exception for the above layer to + // handle it the way they wish to. + throw new Clock.ClockException( + "Logical Time Overflowed: " + logicalTime + "max " + "logical time: " + maxLogicalTime); + } + } + + static void checkPhysicalTimeOverflow(long physicalTime, long maxPhysicalTime) { + if (physicalTime >= maxPhysicalTime) { + // Extremely unlikely to happen, if this happens upper layers may have to kill the server. + throw new Clock.ClockException( + "Physical Time overflowed: " + physicalTime + " and max physical time:" + maxPhysicalTime); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java new file mode 100644 index 0000000..aee8e43 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + [email protected] +public enum ClockType { + SYSTEM{ + public TimestampType timestampType() { + return TimestampType.PHYSICAL; + } + }, SYSTEM_MONOTONIC { + public TimestampType timestampType() { + return TimestampType.PHYSICAL; + } + }, HLC { + public TimestampType timestampType() { + return TimestampType.HYBRID; + } + }; + abstract public TimestampType timestampType(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java index 8637db2..41d3a0a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java @@ -26,8 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * Note : Server side Cell implementations in write path must implement this. * @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead */ +@Deprecated // Co Processors SHOULD NOT use this if the clock type of the tables is HLC @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) -@Deprecated public interface SettableTimestamp { /** http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java new file mode 100644 index 0000000..e2e2604 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.commons.lang.time.FastDateFormat; + +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +/** + * {@link TimestampType} is an enum to represent different ways of encoding time in HBase using + * 64 bits. Time is usually encoded as a 64-bit long in {@link org.apache.hadoop.hbase.Cell} + * timestamps and is used for sorting {@link org.apache.hadoop.hbase.Cell}s, ordering writes etc. + * It has methods which help in constructing or interpreting the 64 bit timestamp and getter + * methods to read the hard coded constants of the particular {@link TimestampType}. + * + * <p> + * Enum {@link TimestampType} is dumb in a way. It doesn't have any logic other than interpreting + * the 64 bits. Any monotonically increasing or monotonically non-decreasing semantics of the + * timestamps are the responsibility of the clock implementation generating the particular + * timestamps. There can be several clock implementations, and each such implementation can map + * its representation of the timestamp to one of the available Timestamp types i.e. + * {@link #HYBRID} or {@link #PHYSICAL}. In essence, the {@link TimestampType} is only used + * internally by the Clock implementations and thus never exposed to the user. The user has to + * know only the different available clock types. So, for the user, timestamp types do not exist. + * </p> + */ [email protected] [email protected] +public enum TimestampType { + /** + * Hybrid is a Timestamp type used to encode both physical time and logical time components + * into a single. 64 bits long integer. It has methods to decipher the 64 bits hybrid timestamp + * and also to construct the hybrid timestamp. + */ + HYBRID { + /** + * Hard coded 44-bits for physical time, with most significant bit carrying the sign i.e 0 + * as we are dealing with positive integers and the remaining 43 bits are to be interpreted as + * system time in milli seconds. See + * <a href="https://issues.apache.org/jira/browse/HBASE-14070">HBASE-14070 </a> for + * understanding the choice of going with the millisecond resolution for physical time. + * Thus allowing us to represent all the dates between unix epoch (1970) and year 2248 with + * signed timestamp comparison with 44 bits for physical time assuming a millisecond + * resolution with signed long integers. Picking 42 bits to represent the physical time has + * the problem of representing time until 2039 only, with signed integers, might cause Y2k39 + * bug hoping HBase to be around till then. The trade-off here is with the year until we can + * represent the physical time vs if we are able capture all the events in the worst case + * (read: leap seconds etc) without the logical component of the timestamp overflowing. With + * 20 bits for logical time, one can represent upto one million events at the same + * millisecond. In case of leap seconds, the no of events happening in the same second is very + * unlikely to exceed one million. + */ + @SuppressWarnings("unused") + private static final int BITS_FOR_PHYSICAL_TIME = 44; + + /** + * Remaining 20-bits for logical time, allowing values up to 1,048,576. Logical Time is the + * least significant part of the 64 bit timestamp, so unsigned comparison can be used for LT. + */ + + private static final int BITS_FOR_LOGICAL_TIME = 20; + + /** + * Max value for physical time in the {@link #HYBRID} timestamp representation, inclusive. + * This assumes signed comparison. + */ + private static final long PHYSICAL_TIME_MAX_VALUE = 0x7ffffffffffL; + + /** + * Max value for logical time in the {@link #HYBRID} timestamp representation + */ + static final long LOGICAL_TIME_MAX_VALUE = 0xfffffL; + + public long toEpochTimeMillisFromTimestamp(long timestamp) { + return getPhysicalTime(timestamp); + } + + public long fromEpochTimeMillisToTimestamp(long timestamp) { + return toTimestamp(TimeUnit.MILLISECONDS, timestamp, 0); + } + + public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) { + physicalTime = TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + return (physicalTime << BITS_FOR_LOGICAL_TIME) + logicalTime; + } + + public long getPhysicalTime(long timestamp) { + return timestamp >>> BITS_FOR_LOGICAL_TIME; // assume unsigned timestamp + } + + long getLogicalTime(long timestamp) { + return timestamp & LOGICAL_TIME_MAX_VALUE; + } + + public long getMaxPhysicalTime() { + return PHYSICAL_TIME_MAX_VALUE; + } + + public long getMaxLogicalTime() { + return LOGICAL_TIME_MAX_VALUE; + } + + int getBitsForLogicalTime() { + return BITS_FOR_LOGICAL_TIME; + } + + /** + * Returns whether the given timestamp is "likely" of {@link #HYBRID} {@link TimestampType}. + * Timestamp implementations can use the full range of 64bits long to represent physical and + * logical components of time. However, this method returns whether the given timestamp is a + * likely representation depending on heuristics for the clock implementation. + * + * Hybrid timestamps are checked whether they belong to Hybrid range assuming + * that Hybrid timestamps will only have > 0 logical time component for timestamps + * corresponding to years after 2016. This method will return false if lt > 0 and year is + * before 2016. Due to left shifting for Hybrid time, all millisecond-since-epoch timestamps + * from years 1970-10K fall into + * year 1970 when interpreted as Hybrid timestamps. Thus, {@link #isLikelyOfType(long)} will + * return false for timestamps which are in the year 1970 and logical time = 0 when + * interpreted as of type Hybrid Time. + * + * <p> + * <b>Note that </b> this method uses heuristics which may not hold + * if system timestamps are intermixed from client side and server side or timestamp + * sources other than system clock are used. + * </p> + * @param timestamp {@link #HYBRID} Timestamp + * @return true if the timestamp is likely to be of the corresponding {@link TimestampType} + * else false + */ + public boolean isLikelyOfType(long timestamp) { + final long physicalTime = getPhysicalTime(timestamp); + final long logicalTime = getLogicalTime(timestamp); + + // heuristic 1: Up until year 2016 (1451635200000), lt component cannot be non-zero. + if (physicalTime < 1451635200000L && logicalTime != 0) { + return false; + } else if (physicalTime < 31536000000L) { + // heuristic 2: Even if logical time = 0, physical time after left shifting by 20 bits, + // will be before year 1971(31536000000L), as after left shifting by 20, all epoch ms + // timestamps from wall time end up in year less than 1971, even for epoch time for the + // year 10000. This assumes Hybrid time is not used to represent timestamps for year 1970 + // UTC. + return false; + } + return true; + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time),Logical Time</code> + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp A {@link #HYBRID} Timestamp + * @return A date time string formatted as mentioned in the method description + */ + public String toString(long timestamp) { + long physicalTime = getPhysicalTime(timestamp); + long logicalTime = getLogicalTime(timestamp); + return new StringBuilder().append(dateFormat.format(physicalTime)).append("(") + .append(physicalTime).append(")").append(", ").append(logicalTime).toString(); + } + }, + + /** + * Physical is a Timestamp type used to encode the physical time in 64 bits. + * It has helper methods to decipher the 64 bit encoding of physical time. + */ + PHYSICAL { + public long toEpochTimeMillisFromTimestamp(long timestamp) { + return timestamp; + } + + public long fromEpochTimeMillisToTimestamp(long timestamp) { + return timestamp; + } + + public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) { + return TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + } + + public long getPhysicalTime(long timestamp) { + return timestamp; + } + + long getLogicalTime(long timestamp) { + return 0; + } + + public long getMaxPhysicalTime() { + return Long.MAX_VALUE; + } + + public long getMaxLogicalTime() { + return 0; + } + + int getBitsForLogicalTime() { + return 0; + } + + public boolean isLikelyOfType(long timestamp) { + // heuristic: the timestamp should be up to year 3K (32503680000000L). + return timestamp < 32503680000000L; + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time)</code> + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp epoch time in milliseconds + * @return A date time string formatted as mentioned in the method description + */ + public String toString(long timestamp) { + return new StringBuilder().append(dateFormat.format(timestamp)).append("(") + .append(timestamp).append(")").append(", ").append("0").toString(); + } + }; + + /** + * This is used internally by the enum methods of Hybrid and Physical Timestamp types to + * convert the + * timestamp to the format set here. UTC timezone instead of local time zone for convenience + * and uniformity + */ + private static final FastDateFormat dateFormat = + FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss:SSS", TimeZone.getTimeZone("UTC")); + + /** + * Converts the given timestamp to the unix epoch timestamp with millisecond resolution. + * Returned timestamp is compatible with System.currentTimeMillis(). + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return number of milliseconds from epoch + */ + abstract public long toEpochTimeMillisFromTimestamp(long timestamp); + + /** + * Converts the given time in milliseconds to the corresponding {@link TimestampType} + * representation. + * @param timeInMillis epoch time in {@link TimeUnit#MILLISECONDS} + * @return a timestamp representation corresponding to {@link TimestampType}. + */ + abstract public long fromEpochTimeMillisToTimestamp(long timeInMillis); + + /** + * Converts the given physical clock in the given {@link TimeUnit} to a 64-bit timestamp + * @param timeUnit a time unit as in the enum {@link TimeUnit} + * @param physicalTime physical time + * @param logicalTime logical time + * @return a timestamp in 64 bits + */ + abstract public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime); + + /** + * Extracts and returns the physical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract public long getPhysicalTime(long timestamp); + + /** + * Extracts and returns the logical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return logical time + */ + abstract long getLogicalTime(long timestamp); + + /** + * @return the maximum possible physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract public long getMaxPhysicalTime(); + + /** + * @return the maximum possible logical time + */ + abstract public long getMaxLogicalTime(); + + /** + * @return number of least significant bits allocated for logical time + */ + abstract int getBitsForLogicalTime(); + + /** + * @param timestamp epoch time in milliseconds + * @return True if the timestamp generated by the clock is of type {@link #PHYSICAL} else False + */ + abstract public boolean isLikelyOfType(long timestamp); + + public abstract String toString(long timestamp); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java new file mode 100644 index 0000000..2476c51 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestClock { + + static final Clock.PhysicalClock PHYSICAL_CLOCK = mock(Clock.PhysicalClock.class); + static final long MAX_CLOCK_SKEW_IN_MS = 1000; + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** + * Wrapper class around any Clock. + * On calls to now() and update(), timestamps returned by the underlying clock are stored in an + * array. Call assertMonotonic() to assert that the timestamps returned were monotonic or not. + */ + private class MonotonicityCheckerClock { + final boolean strictlyIncreasing; + final Clock clock; + final ArrayList<Long> timestamps = new ArrayList<>(); + + MonotonicityCheckerClock(Clock clock, boolean strictlyIncreasing) { + this.clock = clock; + this.strictlyIncreasing = strictlyIncreasing; + } + + long now() { + long ts = clock.now(); + timestamps.add(ts); + return ts; + } + + long update(long timestamp) { + long ts = clock.update(timestamp); + timestamps.add(ts); + return ts; + } + + void assertMonotonic() { + assertTrue(timestamps.size() > 0); + long prev = 0; + for (long timestamp : timestamps) { + if (strictlyIncreasing) { + // This simple comparison works correctly for all types of clocks we have currently. + assertTrue(timestamps.toString(), timestamp > prev); + } else { + assertTrue(timestamps.toString(), timestamp >= prev); + } + prev = timestamp; + } + } + } + + @Before + public void setUp() { + when(PHYSICAL_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS); + } + + // All Clocks Tests + + @Test + public void testSystemMonotonicNow() { + MonotonicityCheckerClock systemMonotonic = + new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false); + + // case 1: Set time and assert + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertEquals(100, systemMonotonic.now()); + + // case 2: Go back in time and check monotonic property. + when(PHYSICAL_CLOCK.now()).thenReturn(99L); + assertEquals(100, systemMonotonic.now()); + + // case 3: system time goes ahead compared to previous timestamp. + when(PHYSICAL_CLOCK.now()).thenReturn(101L); + assertEquals(101, systemMonotonic.now()); + + systemMonotonic.assertMonotonic(); + } + + @Test + public void testSystemMonotonicUpdate() { + MonotonicityCheckerClock systemMonotonic = + new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false); + + // Sets internal time + when(PHYSICAL_CLOCK.now()).thenReturn(99L); + assertEquals(99, systemMonotonic.now()); + + // case 1: Message timestamp is greater than current System Monotonic Time, + // physical time at 100 still. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertEquals(102, systemMonotonic.update(102)); + + // case 2: Message timestamp is greater than current System Monotonic Time, + // physical time at 100 still. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertEquals(103, systemMonotonic.update(103)); + + // case 3: Message timestamp is less than current System Monotonic Time, greater than current + // physical time which is 100. + assertEquals(103, systemMonotonic.update(101)); + + // case 4: Message timestamp is less than current System Monotonic Time, less than current + // physical time which is 100. + assertEquals(103, systemMonotonic.update(99)); + + // case 5: Message timestamp<System monotonic time and both less than current Physical Time + when(PHYSICAL_CLOCK.now()).thenReturn(106L); + assertEquals(106, systemMonotonic.update(102)); + + // case 6: Message timestamp>System monotonic time and both less than current Physical Time + when(PHYSICAL_CLOCK.now()).thenReturn(109L); + assertEquals(109, systemMonotonic.update(108)); + + systemMonotonic.assertMonotonic(); + } + + @Test + public void testSystemMonotonicUpdateMaxClockSkew() throws Clock.ClockException { + final long time = 100L; + Clock.SystemMonotonic systemMonotonic = + new Clock.SystemMonotonic(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS); + + // Set Current Time. + when(PHYSICAL_CLOCK.now()).thenReturn(time); + systemMonotonic.now(); + + // Shouldn't throw ClockException + systemMonotonic.update(time + MAX_CLOCK_SKEW_IN_MS - 1); + + exception.expect(Clock.ClockException.class); + systemMonotonic.update(time + MAX_CLOCK_SKEW_IN_MS + 1); + } + + // All Hybrid Logical Clock Tests + + private void assertHLCTime(long hlcTime, long expectedPhysicalTime, long expectedLogicalTime) { + assertEquals(expectedPhysicalTime, TimestampType.HYBRID.getPhysicalTime(hlcTime)); + assertEquals(expectedLogicalTime, TimestampType.HYBRID.getLogicalTime(hlcTime)); + } + + @Test + public void testHLCNow() throws Clock.ClockException { + MonotonicityCheckerClock hybridLogicalClock = + new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 30000), true); + + // case 1: Test if it returns correct time based on current physical time. + // Remember, initially logical time = 0 + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertHLCTime(hybridLogicalClock.now(), 100, 0); + + // case 2: physical time doesn't change, logical time should increment. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertHLCTime(hybridLogicalClock.now(), 100, 1); + + // case 3: physical time doesn't change still, logical time should increment again + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + assertHLCTime(hybridLogicalClock.now(), 100, 2); + + // case 4: physical time moves forward, logical time should reset to 0. + when(PHYSICAL_CLOCK.now()).thenReturn(101L); + assertHLCTime(hybridLogicalClock.now(), 101, 0); + + // case 5: Monotonic increasing check, physical time goes back. + when(PHYSICAL_CLOCK.now()).thenReturn(99L); + assertHLCTime(hybridLogicalClock.now(), 101, 1); + + hybridLogicalClock.assertMonotonic(); + } + + @Test + public void testHLCNowLogicalTimeOverFlow() { + Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100); + + // Set Current Time. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + hybridLogicalClock.setPhysicalTime(100); + hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime()); + + exception.expect(Clock.ClockException.class); + hybridLogicalClock.now(); + } + + @Test + public void testHLCUpdate() throws Clock.ClockException { + long messageTimestamp; + MonotonicityCheckerClock hybridLogicalClock = + new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 100), true); + + // Set Current Time. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + hybridLogicalClock.now(); + + // case 1: Message physical timestamp is lower than current physical time. + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 99, 1); + when(PHYSICAL_CLOCK.now()).thenReturn(101L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 101, 0); + + // case 2: Message physical timestamp is greater than HLC physical time. + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 3); + when(PHYSICAL_CLOCK.now()).thenReturn(102L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 4); + + // case 3: Message timestamp is less than HLC timestamp + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 104 , 4); + when(PHYSICAL_CLOCK.now()).thenReturn(103L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 5); + + //case 4: Message timestamp with same physical time as HLC, but lower logical time + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 2); + when(PHYSICAL_CLOCK.now()).thenReturn(101L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 6); + + //case 5: Message timestamp with same physical time as HLC, but higher logical time + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 8); + when(PHYSICAL_CLOCK.now()).thenReturn(102L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 9); + + //case 6: Actual Physical Time greater than message physical timestamp and HLC physical time. + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 10); + when(PHYSICAL_CLOCK.now()).thenReturn(110L); + assertHLCTime(hybridLogicalClock.update(messageTimestamp), 110, 0); + + hybridLogicalClock.assertMonotonic(); + } + + @Test + public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException { + long messageTimestamp; + Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100); + + // Set Current Time. + when(PHYSICAL_CLOCK.now()).thenReturn(100L); + hybridLogicalClock.now(); + + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 100, + TimestampType.HYBRID.getMaxLogicalTime()); + exception.expect(Clock.ClockException.class); + hybridLogicalClock.update(messageTimestamp); + } + + @Test + public void testHLCUpdateMaxClockSkew() throws Clock.ClockException { + final long time = 100; + long messageTimestamp; + Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS); + + // Set Current Time. + when(PHYSICAL_CLOCK.now()).thenReturn(time); + hybridLogicalClock.now(); + + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, + time + MAX_CLOCK_SKEW_IN_MS - 1, 0); + hybridLogicalClock.update(messageTimestamp); + + messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, + time + MAX_CLOCK_SKEW_IN_MS + 1, 0); + exception.expect(Clock.ClockException.class); + hybridLogicalClock.update(messageTimestamp); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java new file mode 100644 index 0000000..b1a661b --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * Tests for TimestampType enum. + */ +@Category(SmallTests.class) +public class TestTimestampType { + private static final Log LOG = LogFactory.getLog(TestTimestampType.class); + + private static final long PHYSICAL_TIME = 1234567890123L; + private static final long LOGICAL_TIME = 12; + + private static final long HLC_TIME = TimestampType.HYBRID.toTimestamp( + TimeUnit.MILLISECONDS, PHYSICAL_TIME, LOGICAL_TIME); + private static final long PHYSICAL_CLOCK_TIME = TimestampType.PHYSICAL.toTimestamp( + TimeUnit.MILLISECONDS, PHYSICAL_TIME, LOGICAL_TIME); + + @Test + public void testFromToEpoch() { + for (TimestampType timestamp : TimestampType.values()) { + long wallTime = System.currentTimeMillis(); + long converted = timestamp.toEpochTimeMillisFromTimestamp( + timestamp.fromEpochTimeMillisToTimestamp(wallTime)); + + assertEquals(wallTime, converted); + } + } + + /* Tests for HLC Clock */ + @Test + public void testHybridMaxValues() { + // assert 44-bit Physical Time with signed comparison (actual 43 bits) + assertEquals( + (1L << (63-TimestampType.HYBRID.getBitsForLogicalTime())) - 1, + TimestampType.HYBRID.getMaxPhysicalTime()); + + // assert 20-bit Logical Time + assertEquals( + (1L << TimestampType.HYBRID.getBitsForLogicalTime()) - 1, + TimestampType.HYBRID.getMaxLogicalTime()); + + // assert that maximum representable timestamp is Long.MAX_VALUE (assuming signed comparison). + assertEquals( + Long.MAX_VALUE, + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, + TimestampType.HYBRID.getMaxPhysicalTime(), + TimestampType.HYBRID.getMaxLogicalTime()) + ); + } + + @Test + public void testHybridTimeComponents() { + assertEquals(PHYSICAL_TIME, TimestampType.HYBRID.getPhysicalTime(HLC_TIME)); + assertEquals(LOGICAL_TIME, TimestampType.HYBRID.getLogicalTime(HLC_TIME)); + } + + @Test + public void testHybridToString() { + assertEquals("2009-02-13T23:31:30:123(1234567890123), 12", + TimestampType.HYBRID.toString(HLC_TIME)); + } + + @Test + public void testHybridToTimestamp() { + long expected = (PHYSICAL_TIME << TimestampType.HYBRID.getBitsForLogicalTime()) + LOGICAL_TIME; + // test millisecond + assertEquals(HLC_TIME, expected); + + // test nanosecond + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.NANOSECONDS, + TimeUnit.MILLISECONDS.toNanos(PHYSICAL_TIME), LOGICAL_TIME); + assertEquals(ts, expected); + } + + @Test + public void testHybridIsLikelyOfType() throws ParseException { + ZonedDateTime date = LocalDateTime.of(1970, 1, 1, 1, 1).atZone(ZoneId.of("UTC")); + // test timestamps of Hybrid type from year 1971 to 2248 where lt = 0 + for (int year = 1971; year <= 2248; year += 1) { + date = date.withYear(year); + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, date.toEpochSecond(), 0); + assertTrue("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test timestamps of Hybrid type from year 2016 to 2348 where lt > 0 + for (int year = 2016; year <= 2248; year += 1) { + date = date.withYear(year); + + // Hybrid type ts with pt = date and lt = 123 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, date.toEpochSecond(), 123); + assertTrue("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps from different years are not Hybrid type + for (int year = 1970; year <= 10000 ;year += 10) { + date = date.withYear(year); + final long ts = date.toEpochSecond() * 1000; + assertFalse("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps up to 2016 are not Hybrid even if lt = 0 + for (int year = 1970; year <= 2016; year += 1) { + date = date.withYear(year); + + // reset lt = 0 + long ts = (((date.toEpochSecond() * 1000) + >> TimestampType.HYBRID.getBitsForLogicalTime()) + << TimestampType.HYBRID.getBitsForLogicalTime()); + assertFalse("Year = " + year, TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps from currentTime epoch are not Hybrid type + long systemTimeNow = System.currentTimeMillis(); + LOG.info(TimestampType.PHYSICAL.toString(systemTimeNow)); + LOG.info(TimestampType.PHYSICAL.toString((TimestampType.HYBRID.getPhysicalTime(systemTimeNow)))); + assertFalse(TimestampType.HYBRID.isLikelyOfType(systemTimeNow)); + } + + + @Test + public void testPhysicalMaxValues() { + assertEquals( (1L << 63)- 1, TimestampType.PHYSICAL.getMaxPhysicalTime()); + + assertEquals(0, TimestampType.PHYSICAL.getMaxLogicalTime()); + } + + @Test + public void testPhysicalClockPhysicalAndLogicalTime() { + assertEquals(PHYSICAL_TIME, TimestampType.PHYSICAL.getPhysicalTime(PHYSICAL_CLOCK_TIME)); + assertEquals(0, TimestampType.PHYSICAL.getLogicalTime(PHYSICAL_CLOCK_TIME)); + } + + @Test + public void testPhysicalToString() { + assertEquals("2009-02-13T23:31:30:123(1234567890123), 0", + TimestampType.PHYSICAL.toString(PHYSICAL_CLOCK_TIME)); + } + + @Test + public void testPhysicalToTimestamp() { + // test millisecond + assertEquals(PHYSICAL_CLOCK_TIME, PHYSICAL_TIME); + + // test nanosecond + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.NANOSECONDS, + TimeUnit.MILLISECONDS.toNanos(PHYSICAL_TIME), LOGICAL_TIME); + assertEquals(ts, PHYSICAL_TIME); + } + + @Test + public void testPhysicalIsLikelyOfType() throws ParseException { + final ZonedDateTime date = LocalDateTime.of(1970, 1, 1, 1, 1).atZone(ZoneId.of("UTC")); + + // test that timestamps from 1970 to 3K epoch are of Physical type + for (int year = 1970; year < 3000 ;year += 10) { + // Start date 1970 to 10000 + date.withYear(year); + final long ts = date.toEpochSecond() * 1000; + assertTrue("Year = " + year, TimestampType.PHYSICAL.isLikelyOfType(ts)); + } + + // test that timestamps from currentTime epoch are of Physical type + long systemTimeNow = System.currentTimeMillis(); + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(systemTimeNow)); + + // test timestamps of Hybrid type from year 1970 to 2248 are not of Physical type + for (int year = 1970; year <= 2248; year += 1) { + date.withYear(year); + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.SECONDS, date.toEpochSecond(), 0); + assertFalse(TimestampType.PHYSICAL.isLikelyOfType(ts)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 20a6a03..6d42c06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -256,6 +256,10 @@ public class ModifyTableProcedure throw new IOException("REGION_REPLICATION change is not supported for enabled tables"); } } + // do not allow changing of clock type. + if (modifiedHTableDescriptor.getClockType() != unmodifiedHTableDescriptor.getClockType()) { + throw new IOException("Clock Type change is not supported for tables"); + } // Find out whether all column families in unmodifiedHTableDescriptor also exists in // the modifiedHTableDescriptor. This is to determine whether we are safe to rollback. http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b02b042..a55be97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -100,6 +102,8 @@ import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.TimestampType; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; @@ -380,6 +384,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return minimumReadPoint; } + @Override + public Clock getClock() { + if (this.clock == null) { + return this.getRegionServerServices().getRegionServerClock( + getTableDescriptor().getClockType()); + } + return this.clock; + } + + /** + * Only for the purpose of testing + * @param clock + */ + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -617,6 +639,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>(); final RegionServerServices rsServices; + private Clock clock; private RegionServerAccounting rsAccounting; private long flushCheckInterval; // flushPerChanges is to prevent too many changes in memstore @@ -775,6 +798,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ? DEFAULT_DURABILITY : htd.getDurability(); if (rsServices != null) { + this.clock = rsServices.getRegionServerClock(htd.getClockType()); this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -789,6 +813,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi recoveringRegions.put(encodedName, this); } } else { + this.clock = new Clock.System(); this.metricsRegionWrapper = null; this.metricsRegion = null; } @@ -2796,8 +2821,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE); } + /** + * Clients use physical timestamps when setting time ranges. Tables that use HLCs must map the + * physical timestamp to HLC time + */ + private void mapTimeRangesWithRespectToClock(Scan scan) { + TimeRange tr = scan.getTimeRange(); + if (tr.isAllTime()) { + return; + } + TimestampType timestampType = getClock().getTimestampType(); + // Clip time range max to prevent overflow when converting from epoch time to timestamp time + long trMaxClipped = Math.min(tr.getMax(), timestampType.getMaxPhysicalTime()); + try { + scan.setTimeRange(timestampType.fromEpochTimeMillisToTimestamp(tr.getMin()), + timestampType.fromEpochTimeMillisToTimestamp(trMaxClipped)); + } catch (IOException e) { + e.printStackTrace(); + } + } + private RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { + if (getClock().getClockType() == ClockType.HLC) { + mapTimeRangesWithRespectToClock(scan); + } startRegionOperation(Operation.SCAN); try { // Verify families are all valid @@ -3219,7 +3267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.now(); while (lastIndexExclusive < batchOp.operations.length) { if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { lastIndexExclusive++; @@ -3257,7 +3305,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 2. Update any LATEST_TIMESTAMP timestamps // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); + now = clock.now(); byte[] byteNow = Bytes.toBytes(now); // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? @@ -3756,8 +3804,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // non-decreasing (see HBASE-14070) we should make sure that the mutation has a // larger timestamp than what was observed via Get. doBatchMutate already does this, but // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); - long ts = Math.max(now, cellTs); // ensure write is not eclipsed + long now = clock.now(); + long ts = clock.isMonotonic() ? now : Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); if (mutation != null) { if (mutation instanceof Put) { @@ -4038,7 +4086,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (timestampSlop == HConstants.LATEST_TIMESTAMP) { return; } - long maxTs = now + timestampSlop; + long maxTs = clock.getTimestampType().getPhysicalTime(now) + timestampSlop; for (List<Cell> kvs : familyMap.values()) { assert kvs instanceof RandomAccess; int listSize = kvs.size(); @@ -7101,7 +7149,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Short circuit the read only case if (processor.readOnly()) { try { - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.now(); doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit, true); } finally { @@ -7131,7 +7179,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); locked = true; - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.now(); // STEP 4. Let the processor scan the rows, generate mutations and add waledits doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { @@ -7440,7 +7488,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final List<Cell> results) throws IOException { WALEdit walEdit = null; - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.now(); final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; // Process a Store/family at a time. for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) { @@ -7556,7 +7604,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long ts = now; if (currentValue != null) { tags = TagUtil.carryForwardTags(tags, currentValue); - ts = Math.max(now, currentValue.getTimestamp() + 1); + // Ensure that timestamp of increment operation is greater than existing cell timestamp + if (this.getClock().getClockType() == ClockType.SYSTEM || + this.getClock().getClockType() == ClockType.SYSTEM_MONOTONIC) { + ts = Math.max(now, currentValue.getTimestamp() + 1); + } newValue += getLongValue(currentValue); } // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made... @@ -7582,7 +7634,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte [] row = mutation.getRow(); if (currentValue != null) { tags = TagUtil.carryForwardTags(tags, currentValue); - ts = Math.max(now, currentValue.getTimestamp() + 1); + // Ensure that timestamp of increment operation is greater than existing cell timestamp + if (this.getClock().getClockType() == ClockType.SYSTEM || + this.getClock().getClockType() == ClockType.SYSTEM_MONOTONIC) { + ts = Math.max(now, currentValue.getTimestamp() + 1); + } tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); byte[] tagBytes = TagUtil.fromList(tags); // Allocate an empty cell and copy in all parts. @@ -7685,7 +7741,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (15 * Bytes.SIZEOF_LONG) + 6 * Bytes.SIZEOF_BOOLEAN); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 986d6d4..dc7c70e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; @@ -331,6 +333,13 @@ public class HRegionServer extends HasThread implements // debugging and unit tests. private volatile boolean abortRequested; + // Region server contains instances of all three clock clocks. Regions have a set + // clock type so depending on the clock type needed by a region, the appropriate + // one can be accessed. + final protected Clock hybridLogicalClock; + final protected Clock systemMonotonicClock; + final protected Clock systemClock; + ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>(); // A state before we go into stopped state. At this stage we're closing user @@ -588,6 +597,10 @@ public class HRegionServer extends HasThread implements this.abortRequested = false; this.stopped = false; + this.hybridLogicalClock = new Clock.HLC(); + this.systemMonotonicClock = new Clock.SystemMonotonic(); + this.systemClock = new Clock.System(); + rpcServices = createRpcServices(); this.startcode = System.currentTimeMillis(); if (this instanceof HMaster) { @@ -2074,6 +2087,17 @@ public class HRegionServer extends HasThread implements } @Override + public Clock getRegionServerClock(ClockType clockType) { + if (clockType.equals(ClockType.HLC)){ + return this.hybridLogicalClock; + } else if (clockType.equals(ClockType.SYSTEM_MONOTONIC)) { + return this.systemMonotonicClock; + } else { + return this.systemClock; + } + } + + @Override public Connection getConnection() { return getClusterConnection(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c914ab6..4b2b460 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import org.apache.hadoop.hbase.TimestampType; +import org.apache.hadoop.hbase.Clock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -341,10 +343,10 @@ public class HStore implements Store { /** * @param family - * @return TTL in seconds of the specified family + * @return TTL in milli seconds of the specified family */ public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { - // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. + // HColumnDescriptor.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { // Default is unlimited ttl. @@ -403,6 +405,11 @@ public class HStore implements Store { } @Override + public Clock getClock() { + return region.getClock(); + } + + @Override @Deprecated public long getSnapshotSize() { MemstoreSize size = getSizeOfSnapshot(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 4c188fe..ea50943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Clock; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -82,6 +83,11 @@ public interface Region extends ConfigurationObserver { /** @return table descriptor for this region */ TableDescriptor getTableDescriptor(); + /** @return clock of the Region Server corresponding the clock type used by the + * table contained in this region. + */ + Clock getClock(); + /** @return true if region is available (not closed and not closing) */ boolean isAvailable(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 5afa652..5c37136 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.zookeeper.KeeperException; import com.google.protobuf.Service; @@ -58,6 +60,8 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * default (common) WAL */ WAL getWAL(HRegionInfo regionInfo) throws IOException; + Clock getRegionServerClock(ClockType clockType); + /** @return the List of WALs that are used by this server * Doesn't include the meta WAL */ http://git-wip-us.apache.org/repos/asf/hbase/blob/d53fbc74/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fd9de9b..82edc7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Clock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -346,6 +347,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf MemstoreSize getSizeToFlush(); /** + * @return clock of the Region Server corresponding the clock type used by the + * table referred to by this store. + */ + Clock getClock(); + + /** * Returns the memstore snapshot size * @return size of the memstore snapshot * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfSnapshot()} instead.
