Remove globalTime and updateTime
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f858129 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f858129 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f858129 Branch: refs/heads/ignite-4587 Commit: 2f8581297e4a35f0f016917bfe02a550108afcb4 Parents: ea61e62 Author: Max Kozlov <[email protected]> Authored: Thu Mar 16 14:08:26 2017 +0300 Committer: Max Kozlov <[email protected]> Committed: Thu Mar 16 14:08:26 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 8 - .../ignite/internal/GridKernalContextImpl.java | 12 - .../apache/ignite/internal/IgniteKernal.java | 2 - .../cache/GridCacheAtomicVersionComparator.java | 21 +- .../processors/cache/GridCacheMapEntry.java | 1 - .../processors/cache/GridCacheMvcc.java | 23 +- .../processors/cache/GridCacheUtils.java | 7 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 8 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../version/GridCachePlainVersionedEntry.java | 5 - .../version/GridCacheRawVersionedEntry.java | 5 - .../cache/version/GridCacheVersion.java | 47 +- .../cache/version/GridCacheVersionEx.java | 19 +- .../cache/version/GridCacheVersionManager.java | 9 +- .../cache/version/GridCacheVersionedEntry.java | 7 - .../processors/clock/GridClockServer.java | 6 - .../clock/GridClockSyncProcessor.java | 481 ------------------- .../ignite/internal/util/IgniteUtils.java | 26 +- .../cache/GridCacheMvccFlagsTest.java | 4 +- .../cache/GridCacheMvccPartitionedSelfTest.java | 24 +- .../processors/cache/GridCacheMvccSelfTest.java | 8 +- .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../cache/GridCacheVersionSelfTest.java | 4 +- .../transactions/DepthFirstSearchTest.java | 18 +- .../clock/GridTimeSyncProcessorSelfTest.java | 224 --------- 26 files changed, 77 insertions(+), 898 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 00696c7..1c39f9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.clock.GridClockSource; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -178,13 +177,6 @@ public interface GridKernalContext extends Iterable<GridComponent> { public GridTimeoutProcessor timeout(); /** - * Gets time processor. - * - * @return Time processor. - */ - public GridClockSyncProcessor clockSync(); - - /** * Gets resource processor. * * @return Resource processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index e80ec6b..24bb97a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.clock.GridClockSource; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.clock.GridJvmClockSource; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -186,10 +185,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private GridClockSyncProcessor clockSyncProc; - - /** */ - @GridToStringInclude private GridResourceProcessor rsrcProc; /** */ @@ -514,8 +509,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable jobProc = (GridJobProcessor)comp; else if (comp instanceof GridTimeoutProcessor) timeProc = (GridTimeoutProcessor)comp; - else if (comp instanceof GridClockSyncProcessor) - clockSyncProc = (GridClockSyncProcessor)comp; else if (comp instanceof GridResourceProcessor) rsrcProc = (GridResourceProcessor)comp; else if (comp instanceof GridJobMetricsProcessor) @@ -637,11 +630,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public GridClockSyncProcessor clockSync() { - return clockSyncProc; - } - - /** {@inheritDoc} */ @Override public GridResourceProcessor resource() { return rsrcProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index cdbe2e3..77ba212 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -108,7 +108,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -891,7 +890,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); - startProcessor(new GridClockSyncProcessor(ctx)); startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java index 08a5b28..ac0abec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java @@ -36,24 +36,17 @@ public class GridCacheAtomicVersionComparator { int otherTopVer = other.topologyVersion(); if (topVer == otherTopVer) { - long globalTime = one.globalTime(); - long otherGlobalTime = other.globalTime(); + long locOrder = one.order(); + long otherLocOrder = other.order(); - if (globalTime == otherGlobalTime || ignoreTime) { - long locOrder = one.order(); - long otherLocOrder = other.order(); + if (locOrder == otherLocOrder) { + int nodeOrder = one.nodeOrder(); + int otherNodeOrder = other.nodeOrder(); - if (locOrder == otherLocOrder) { - int nodeOrder = one.nodeOrder(); - int otherNodeOrder = other.nodeOrder(); - - return nodeOrder == otherNodeOrder ? 0 : nodeOrder < otherNodeOrder ? -1 : 1; - } - else - return locOrder > otherLocOrder ? 1 : -1; + return nodeOrder == otherNodeOrder ? 0 : nodeOrder < otherNodeOrder ? -1 : 1; } else - return globalTime > otherGlobalTime ? 1 : -1; + return locOrder > otherLocOrder ? 1 : -1; } else return topVer > otherTopVer ? 1 : -1; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c33b60d..68d171c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2384,7 +2384,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Incorporate conflict version into new version if needed. if (conflictVer != null && conflictVer != newVer) newVer = new GridCacheVersionEx(newVer.topologyVersion(), - newVer.globalTime(), newVer.order(), newVer.nodeOrder(), newVer.dataCenterId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index 498584c..28cc8fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -57,26 +57,19 @@ public final class GridCacheMvcc { /** */ private static final Comparator<GridCacheVersion> SER_VER_COMPARATOR = new Comparator<GridCacheVersion>() { @Override public int compare(GridCacheVersion ver1, GridCacheVersion ver2) { - long time1 = ver1.globalTime(); - long time2 = ver2.globalTime(); + int nodeOrder1 = ver1.nodeOrder(); + int nodeOrder2 = ver2.nodeOrder(); - if (time1 == time2) { - int nodeOrder1 = ver1.nodeOrder(); - int nodeOrder2 = ver2.nodeOrder(); + if (nodeOrder1 == nodeOrder2) { + long order1 = ver1.order(); + long order2 = ver2.order(); - if (nodeOrder1 == nodeOrder2) { - long order1 = ver1.order(); - long order2 = ver2.order(); + assert order1 != order2; - assert order1 != order2; - - return order1 > order2 ? 1 : -1; - } - else - return nodeOrder1 > nodeOrder2 ? 1 : -1; + return order1 > order2 ? 1 : -1; } else - return time1 > time2 ? 1 : -1; + return nodeOrder1 > nodeOrder2 ? 1 : -1; } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 1c59390..45d2c4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -981,12 +981,11 @@ public class GridCacheUtils { public static byte[] versionToBytes(GridCacheVersion ver) { assert ver != null; - byte[] bytes = new byte[28]; + byte[] bytes = new byte[20]; U.intToBytes(ver.topologyVersion(), bytes, 0); - U.longToBytes(ver.globalTime(), bytes, 4); - U.longToBytes(ver.order(), bytes, 12); - U.intToBytes(ver.nodeOrderAndDrIdRaw(), bytes, 20); + U.longToBytes(ver.order(), bytes, 4); + U.intToBytes(ver.nodeOrderAndDrIdRaw(), bytes, 12); return bytes; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 1fe2d69..e49e266 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -73,7 +73,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Dummy version sent to older nodes for backward compatibility, */ - private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0); + private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0); /** Logger. */ private static IgniteLogger log; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 1691fd7..da371a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -72,16 +72,16 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private static final long serialVersionUID = 0L; /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */ - public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); + public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0); /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ - public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 1); /** */ - public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2); + public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 2); /** */ - public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3); + public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3); /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a1a18fe..2f12a1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2426,7 +2426,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param nearVer Near transaction version. */ private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) { - super(ver.topologyVersion(), ver.globalTime(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); + super(ver.topologyVersion(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); assert nearVer != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java index c175e5a..f4d7e08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java @@ -110,11 +110,6 @@ public class GridCachePlainVersionedEntry<K, V> implements GridCacheVersionedEnt } /** {@inheritDoc} */ - @Override public long globalTime() { - return isStartVer ? 0L : ver.globalTime(); - } - - /** {@inheritDoc} */ @Override public GridCacheVersion version() { return ver; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index b7c96b4..7513935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -170,11 +170,6 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen } /** {@inheritDoc} */ - @Override public long globalTime() { - return ver.globalTime(); - } - - /** {@inheritDoc} */ @Override public GridCacheVersion version() { return ver; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java index ccc17e5..ea06a52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java @@ -50,9 +50,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** Node order (used as global order) and DR ID. */ private int nodeOrderDrId; - /** Globally adjusted time. */ - private long globalTime; - /** Order. */ private long order; @@ -65,12 +62,11 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** * @param topVer Topology version plus number of seconds from the start time of the first grid node. - * @param globalTime Globally adjusted time. * @param order Version order. * @param nodeOrder Node order. * @param dataCenterId Replication data center ID. */ - public GridCacheVersion(int topVer, long globalTime, long order, int nodeOrder, int dataCenterId) { + public GridCacheVersion(int topVer, long order, int nodeOrder, int dataCenterId) { assert topVer >= 0 : topVer; assert order >= 0 : order; assert nodeOrder >= 0 : nodeOrder; @@ -80,7 +76,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, throw new IllegalArgumentException("Node order overflow: " + nodeOrder); this.topVer = topVer; - this.globalTime = globalTime; this.order = order; nodeOrderDrId = nodeOrder | (dataCenterId << DR_ID_SHIFT); @@ -90,13 +85,11 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** * @param topVer Topology version plus number of seconds from the start time of the first grid node. * @param nodeOrderDrId Node order and DR ID. - * @param globalTime Globally adjusted time. * @param order Version order. */ - public GridCacheVersion(int topVer, int nodeOrderDrId, long globalTime, long order) { + public GridCacheVersion(int topVer, int nodeOrderDrId, long order) { this.topVer = topVer; this.nodeOrderDrId = nodeOrderDrId; - this.globalTime = globalTime; this.order = order; } @@ -117,13 +110,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, } /** - * @return Adjusted time. - */ - public long globalTime() { - return globalTime; - } - - /** * @return Version order. */ public long order() { @@ -187,7 +173,7 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, * @return Version represented as {@code GridUuid} */ public IgniteUuid asGridUuid() { - return new IgniteUuid(new UUID(((long)topVer << 32) | nodeOrderDrId, globalTime), order); + return new IgniteUuid(new UUID(topVer, nodeOrderDrId), order); } /** {@inheritDoc} */ @@ -198,7 +184,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(topVer); - out.writeLong(globalTime); out.writeLong(order); out.writeInt(nodeOrderDrId); } @@ -206,7 +191,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException { topVer = in.readInt(); - globalTime = in.readLong(); order = in.readLong(); nodeOrderDrId = in.readInt(); } @@ -264,24 +248,18 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, switch (writer.state()) { case 0: - if (!writer.writeLong("globalTime", globalTime)) - return false; - - writer.incrementState(); - - case 1: if (!writer.writeInt("nodeOrderDrId", nodeOrderDrId)) return false; writer.incrementState(); - case 2: + case 1: if (!writer.writeLong("order", order)) return false; writer.incrementState(); - case 3: + case 2: if (!writer.writeInt("topVer", topVer)) return false; @@ -301,14 +279,6 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, switch (reader.state()) { case 0: - globalTime = reader.readLong("globalTime"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: nodeOrderDrId = reader.readInt("nodeOrderDrId"); if (!reader.isLastRead()) @@ -316,7 +286,7 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, reader.incrementState(); - case 2: + case 1: order = reader.readLong("order"); if (!reader.isLastRead()) @@ -324,7 +294,7 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, reader.incrementState(); - case 3: + case 2: topVer = reader.readInt("topVer"); if (!reader.isLastRead()) @@ -344,13 +314,12 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 3; } /** {@inheritDoc} */ @Override public String toString() { return "GridCacheVersion [topVer=" + topologyVersion() + - ", time=" + globalTime() + ", order=" + order() + ", nodeOrder=" + nodeOrder() + ']'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index c89b941..27b4edc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -46,15 +46,14 @@ public class GridCacheVersionEx extends GridCacheVersion { * Constructor. * * @param topVer Topology version. - * @param globalTime Global time. * @param order Order. * @param nodeOrder Node order. * @param dataCenterId Data center ID. * @param drVer DR version. */ - public GridCacheVersionEx(int topVer, long globalTime, long order, int nodeOrder, byte dataCenterId, + public GridCacheVersionEx(int topVer, long order, int nodeOrder, byte dataCenterId, GridCacheVersion drVer) { - super(topVer, globalTime, order, nodeOrder, dataCenterId); + super(topVer, order, nodeOrder, dataCenterId); assert drVer != null && !(drVer instanceof GridCacheVersionEx); // DR version can only be plain here. @@ -66,12 +65,11 @@ public class GridCacheVersionEx extends GridCacheVersion { * * @param topVer Topology version. * @param nodeOrderDrId Node order and DR ID. - * @param globalTime Globally adjusted time. * @param order Version order. * @param drVer DR version. */ - public GridCacheVersionEx(int topVer, int nodeOrderDrId, long globalTime, long order, GridCacheVersion drVer) { - super(topVer, nodeOrderDrId, globalTime, order); + public GridCacheVersionEx(int topVer, int nodeOrderDrId, long order, GridCacheVersion drVer) { + super(topVer, nodeOrderDrId, order); assert drVer != null && !(drVer instanceof GridCacheVersionEx); // DR version can only be plain here. @@ -90,7 +88,7 @@ public class GridCacheVersionEx extends GridCacheVersion { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 4; } /** {@inheritDoc} */ @@ -108,7 +106,7 @@ public class GridCacheVersionEx extends GridCacheVersion { } switch (writer.state()) { - case 4: + case 3: if (!writer.writeMessage("drVer", drVer)) return false; @@ -130,7 +128,7 @@ public class GridCacheVersionEx extends GridCacheVersion { return false; switch (reader.state()) { - case 4: + case 3: drVer = reader.readMessage("drVer"); if (!reader.isLastRead()) @@ -162,9 +160,8 @@ public class GridCacheVersionEx extends GridCacheVersion { /** {@inheritDoc} */ @Override public String toString() { return "GridCacheVersionEx [topVer=" + topologyVersion() + - ", time=" + globalTime() + ", order=" + order() + ", nodeOrder=" + nodeOrder() + ", drVer=" + drVer + ']'; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 5a8904f..15a7d2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -79,7 +79,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId); + last = new GridCacheVersion(0, order.get(), 0, dataCenterId); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED); } @@ -103,7 +103,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { public void dataCenterId(byte dataCenterId) { this.dataCenterId = dataCenterId; - last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId); + last = new GridCacheVersion(0, order.get(), 0, dataCenterId); } /** @@ -185,7 +185,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000; - ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, dataCenterId); + ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 1, dataCenterId); } return ISOLATED_STREAMER_VER; @@ -271,8 +271,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); - long globalTime = cctx.kernalContext().clockSync().adjustedTime(topVer); - if (addTime) { if (gridStartTime == 0) gridStartTime = cctx.kernalContext().discovery().gridStartTime(); @@ -286,7 +284,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { GridCacheVersion next = new GridCacheVersion( (int)topVer, - globalTime, ord, locNodeOrder, dataCenterId); http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntry.java index 06e7999..e685d79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntry.java @@ -71,11 +71,4 @@ public interface GridCacheVersionedEntry<K, V> { * @return Entry's order in initiator data center */ public long order(); - - /** - * Gets entry's global time in initiator data center. - * - * @return Entry's global time in initiator data center - */ - public long globalTime(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java index 8daef31..a736a37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java @@ -47,9 +47,6 @@ public class GridClockServer { /** Read worker. */ private GridWorker readWorker; - /** Instance of time processor. */ - private GridClockSyncProcessor clockSync; - /** * Starts server. * @@ -59,7 +56,6 @@ public class GridClockServer { public void start(GridKernalContext ctx) throws IgniteCheckedException { this.ctx = ctx; - clockSync = ctx.clockSync(); log = ctx.log(GridClockServer.class); try { @@ -205,8 +201,6 @@ public class GridClockServer { GridClockMessage msg = GridClockMessage.fromBytes(packet.getData(), packet.getOffset(), packet.getLength()); - - clockSync.onMessageReceived(msg, packet.getAddress(), packet.getPort()); } catch (IgniteCheckedException e) { U.warn(log, "Failed to assemble clock server message (will ignore the packet) [host=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java deleted file mode 100644 index 257d0d9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.net.InetAddress; -import java.util.Collection; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_TIME_SYNC; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_HOST; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_PORT; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; - -/** - * Time synchronization processor. - */ -public class GridClockSyncProcessor extends GridProcessorAdapter { - /** Maximum size for time sync history. */ - private static final int MAX_TIME_SYNC_HISTORY = 100; - - /** Time server instance. */ - private GridClockServer srv; - - /** Shutdown lock. */ - private GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Time coordinator thread. */ - private volatile TimeCoordinator timeCoord; - - /** Time delta history. Constructed on coordinator. */ - private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist = - new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY); - - /** Last recorded. */ - private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot; - - /** Time source. */ - private GridClockSource clockSrc; - - /** - * @param ctx Kernal context. - */ - public GridClockSyncProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - clockSrc = ctx.timeSource(); - - srv = new GridClockServer(); - - srv.start(ctx); - - ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridClockDeltaSnapshotMessage; - - GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg; - - GridClockDeltaVersion ver = msg0.snapshotVersion(); - - GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas()); - - lastSnapshot = new T2<>(ver, snap); - - timeSyncHist.put(ver, snap); - } - }); - - // We care only about node leave and fail events. - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) - checkLaunchCoordinator(discoEvt); - - TimeCoordinator timeCoord0 = timeCoord; - - if (timeCoord0 != null) - timeCoord0.onDiscoveryEvent(discoEvt); - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED); - - ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host()); - ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - srv.afterStart(); - - // Check at startup if this node is a fragmentizer coordinator. - DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent(); - - checkLaunchCoordinator(locJoinEvt); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - rw.writeLock(); - - try { - stopping = false; - - if (timeCoord != null) { - timeCoord.cancel(); - - U.join(timeCoord, log); - - timeCoord = null; - } - - if (srv != null) - srv.beforeStop(); - } - finally { - rw.writeUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (srv != null) - srv.stop(); - } - - /** - * Gets current time on local node. - * - * @return Current time in milliseconds. - */ - private long currentTime() { - return clockSrc.currentTimeMillis(); - } - - /** - * @return Time sync history. - */ - public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHistory() { - return timeSyncHist; - } - - /** - * Callback from server for message receiving. - * - * @param msg Received message. - * @param addr Remote node address. - * @param port Remote node port. - */ - public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) { - long rcvTs = currentTime(); - - if (!msg.originatingNodeId().equals(ctx.localNodeId())) { - // We received time request from remote node, set current time and reply back. - msg.replyTimestamp(rcvTs); - - try { - srv.sendPacket(msg, addr, port); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send time server reply to remote node: " + msg, e); - } - } - else - timeCoord.onMessage(msg, rcvTs); - } - - /** - * Checks if local node is the oldest node in topology and starts time coordinator if so. - * - * @param discoEvt Discovery event. - */ - private void checkLaunchCoordinator(DiscoveryEvent discoEvt) { - rw.readLock(); - - try { - if (stopping) - return; - - if (timeCoord == null) { - long minNodeOrder = Long.MAX_VALUE; - - Collection<ClusterNode> nodes = discoEvt.topologyNodes(); - - for (ClusterNode node : nodes) { - if (node.order() < minNodeOrder) - minNodeOrder = node.order(); - } - - ClusterNode locNode = ctx.discovery().localNode(); - - if (locNode.order() == minNodeOrder) { - if (log.isDebugEnabled()) - log.debug("Detected local node to be the eldest node in topology, starting time " + - "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']'); - - synchronized (this) { - if (timeCoord == null && !stopping) { - timeCoord = new TimeCoordinator(discoEvt); - - IgniteThread th = new IgniteThread(timeCoord); - - th.setPriority(Thread.MAX_PRIORITY); - - th.start(); - } - } - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Gets time adjusted with time coordinator on given topology version. - * - * @param topVer Topology version. - * @return Adjusted time. - */ - public long adjustedTime(long topVer) { - T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot; - - GridClockDeltaSnapshot snap; - - if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer) - snap = fastSnap.get2(); - else { - // Get last synchronized time on given topology version. - Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry( - new GridClockDeltaVersion(0, topVer + 1)); - - snap = entry == null ? null : entry.getValue(); - } - - long now = clockSrc.currentTimeMillis(); - - if (snap == null) - return now; - - Long delta = snap.deltas().get(ctx.localNodeId()); - - if (delta == null) - delta = 0L; - - return now + delta; - } - - /** - * Publishes snapshot to topology. - * - * @param snapshot Snapshot to publish. - * @param top Topology to send given snapshot to. - */ - private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) { - if (!rw.tryReadLock()) - return; - - try { - lastSnapshot = new T2<>(snapshot.version(), snapshot); - - timeSyncHist.put(snapshot.version(), snapshot); - - for (ClusterNode n : top.topologyNodes()) { - GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage( - snapshot.version(), snapshot.deltas()); - - try { - ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().pingNodeNoError(n.id())) - U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Time coordinator thread. - */ - private class TimeCoordinator extends GridWorker { - /** Last discovery topology snapshot. */ - private volatile GridDiscoveryTopologySnapshot lastSnapshot; - - /** Snapshot being constructed. May be not null only on coordinator node. */ - private volatile GridClockDeltaSnapshot pendingSnapshot; - - /** Version counter. */ - private long verCnt = 1; - - /** - * Time coordinator thread constructor. - * - * @param evt Discovery event on which this node became a coordinator. - */ - protected TimeCoordinator(DiscoveryEvent evt) { - super(ctx.gridName(), "grid-time-coordinator", GridClockSyncProcessor.this.log); - - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - GridDiscoveryTopologySnapshot top = lastSnapshot; - - if (log.isDebugEnabled()) - log.debug("Creating time sync snapshot for topology: " + top); - - GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot( - new GridClockDeltaVersion(verCnt++, top.topologyVersion()), - ctx.localNodeId(), - top, - ctx.config().getClockSyncSamples()); - - pendingSnapshot = snapshot; - - while (!snapshot.ready()) { - if (log.isDebugEnabled()) - log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds()); - - for (UUID nodeId : snapshot.pendingNodeIds()) - requestTime(nodeId); - - if (log.isDebugEnabled()) - log.debug("Waiting for snapshot to be ready: " + snapshot); - - // Wait for all replies - snapshot.awaitReady(1000); - } - - // No more messages should be processed. - pendingSnapshot = null; - - if (log.isDebugEnabled()) - log.debug("Collected time sync results: " + snapshot.deltas()); - - publish(snapshot, top); - - synchronized (this) { - if (top.topologyVersion() == lastSnapshot.topologyVersion()) - wait(ctx.config().getClockSyncFrequency()); - } - } - } - - /** - * @param evt Discovery event. - */ - public void onDiscoveryEvent(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event: " + evt); - - if (evt.type() == EventType.EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) - onNodeLeft(evt.eventNode().id()); - - synchronized (this) { - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - - notifyAll(); - } - } - - /** - * @param msg Message received from remote node. - * @param rcvTs Receive timestamp. - */ - private void onMessage(GridClockMessage msg, long rcvTs) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) { - long delta = (msg.originatingTimestamp() + rcvTs) / 2 - msg.replyTimestamp(); - - boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta); - - if (needMore) - requestTime(msg.targetNodeId()); - } - } - - /** - * Requests time from remote node. - * - * @param rmtNodeId Remote node ID. - */ - private void requestTime(UUID rmtNodeId) { - ClusterNode node = ctx.discovery().node(rmtNodeId); - - if (node != null) { - InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST); - int port = node.attribute(ATTR_TIME_SERVER_PORT); - - try { - GridClockMessage req = new GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0); - - srv.sendPacket(req, addr, port); - } - catch (IgniteCheckedException e) { - LT.error(log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId + - ", addr=" + addr + ", port=" + port + ']'); - } - } - else - onNodeLeft(rmtNodeId); - } - - /** - * Node left callback. - * - * @param nodeId Left node ID. - */ - private void onNodeLeft(UUID nodeId) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) - curr.onNodeLeft(nodeId); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f6c8163..374b02c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9219,10 +9219,6 @@ public abstract class IgniteUtils { off += 4; - GridUnsafe.putLong(arr, off, drVer.globalTime()); - - off += 8; - GridUnsafe.putLong(arr, off, drVer.order()); off += 8; @@ -9236,10 +9232,6 @@ public abstract class IgniteUtils { off += 4; - GridUnsafe.putLong(arr, off, ver.globalTime()); - - off += 8; - GridUnsafe.putLong(arr, off, ver.order()); off += 8; @@ -9255,16 +9247,14 @@ public abstract class IgniteUtils { public static GridCacheVersion readVersion(long ptr, boolean verEx) { GridCacheVersion ver = new GridCacheVersion(GridUnsafe.getInt(ptr), GridUnsafe.getInt(ptr + 4), - GridUnsafe.getLong(ptr + 8), - GridUnsafe.getLong(ptr + 16)); + GridUnsafe.getLong(ptr + 8)); if (verEx) { - ptr += 24; + ptr += 16; ver = new GridCacheVersionEx(GridUnsafe.getInt(ptr), GridUnsafe.getInt(ptr + 4), GridUnsafe.getLong(ptr + 8), - GridUnsafe.getLong(ptr + 16), ver); } @@ -9286,15 +9276,11 @@ public abstract class IgniteUtils { off += 4; - long globalTime = GridUnsafe.getLong(arr, off); - - off += 8; - long order = GridUnsafe.getLong(arr, off); off += 8; - GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); + GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, order); if (verEx) { topVer = GridUnsafe.getInt(arr, off); @@ -9305,13 +9291,9 @@ public abstract class IgniteUtils { off += 4; - globalTime = GridUnsafe.getLong(arr, off); - - off += 8; - order = GridUnsafe.getLong(arr, off); - ver = new GridCacheVersionEx(topVer, nodeOrderDrId, globalTime, order, ver); + ver = new GridCacheVersionEx(topVer, nodeOrderDrId, order, ver); } return ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFlagsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFlagsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFlagsTest.java index ff2d62d..827b0a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFlagsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFlagsTest.java @@ -73,7 +73,7 @@ public class GridCacheMvccFlagsTest extends GridCommonAbstractTest { UUID id = UUID.randomUUID(); - GridCacheVersion ver = new GridCacheVersion(1, 0, 0, 0, 0); + GridCacheVersion ver = new GridCacheVersion(1, 0, 0, 0); GridCacheMvccCandidate c = new GridCacheMvccCandidate( entry, @@ -114,7 +114,7 @@ public class GridCacheMvccFlagsTest extends GridCommonAbstractTest { UUID id = UUID.randomUUID(); - GridCacheVersion ver = new GridCacheVersion(1, 0, 0, 0, 0); + GridCacheVersion ver = new GridCacheVersion(1, 0, 0, 0); GridCacheMvccCandidate c = new GridCacheMvccCandidate( entry, http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccPartitionedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccPartitionedSelfTest.java index 9da6ea6..2fb5d25 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccPartitionedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccPartitionedSelfTest.java @@ -604,9 +604,9 @@ public class GridCacheMvccPartitionedSelfTest extends GridCommonAbstractTest { public void testSerializableReadLocksAdd() throws Exception { GridCacheAdapter<String, String> cache = grid.internalCache(); - GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 10, 1); - GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 20, 1); - GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 15, 1); + GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 1); { GridCacheMvcc mvcc = new GridCacheMvcc(cache.context()); @@ -681,9 +681,9 @@ public class GridCacheMvccPartitionedSelfTest extends GridCommonAbstractTest { public void testSerializableReadLocksAssign() throws Exception { GridCacheAdapter<String, String> cache = grid.internalCache(); - GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 10, 1); - GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 20, 1); - GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 15, 1); + GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 1); { GridCacheMvcc mvcc = new GridCacheMvcc(cache.context()); @@ -859,7 +859,7 @@ public class GridCacheMvccPartitionedSelfTest extends GridCommonAbstractTest { 1, version(2), 0, - new GridCacheVersion(0, 0, 30, 1), + new GridCacheVersion(0, 0, 1), false, true, false, @@ -883,10 +883,10 @@ public class GridCacheMvccPartitionedSelfTest extends GridCommonAbstractTest { GridCacheTestEntryEx e = new GridCacheTestEntryEx(cache.context(), "1"); - GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 10, 1); - GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 20, 1); - GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 15, 1); - GridCacheVersion serOrder4 = new GridCacheVersion(0, 0, 30, 1); + GridCacheVersion serOrder1 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder2 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder3 = new GridCacheVersion(0, 0, 1); + GridCacheVersion serOrder4 = new GridCacheVersion(0, 0, 1); GridCacheVersion ver1 = incVer ? version(1) : version(4); GridCacheVersion ver2 = incVer ? version(2) : version(3); @@ -991,7 +991,7 @@ public class GridCacheMvccPartitionedSelfTest extends GridCommonAbstractTest { * @return Version. */ private GridCacheVersion version(int order) { - return new GridCacheVersion(1, 0, order, order, 0); + return new GridCacheVersion(1, order, order, 0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java index d352e26..e15a20f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java @@ -174,8 +174,8 @@ public class GridCacheMvccSelfTest extends GridCommonAbstractTest { checkOrder(cands, ver1, ver5, ver3, ver2); entry.orderCompleted( - new GridCacheVersion(1, 0, 2, 0, 0), - Arrays.asList(new GridCacheVersion(1, 0, 3, 4, 0), ver2, new GridCacheVersion(1, 0, 5, 6, 0)), + new GridCacheVersion(1, 2, 0, 0), + Arrays.asList(new GridCacheVersion(1, 3, 4, 0), ver2, new GridCacheVersion(1, 5, 6, 0)), Collections.<GridCacheVersion>emptyList() ); @@ -1053,7 +1053,7 @@ public class GridCacheMvccSelfTest extends GridCommonAbstractTest { entry.orderCompleted( ver2 /*local version.*/, - Arrays.asList(new GridCacheVersion(1, 0, 1, 2, 0), ver3, new GridCacheVersion(1, 0, 5, 6, 0)), + Arrays.asList(new GridCacheVersion(1, 1, 2, 0), ver3, new GridCacheVersion(1, 5, 6, 0)), Collections.<GridCacheVersion>emptyList() ); @@ -1619,7 +1619,7 @@ public class GridCacheMvccSelfTest extends GridCommonAbstractTest { * @return Version. */ private GridCacheVersion version(int order) { - return new GridCacheVersion(1, 0, order, order, 0); + return new GridCacheVersion(1, order, order, 0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index e76ab40..821807f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -52,7 +52,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr private long ttl; /** Version. */ - private GridCacheVersion ver = new GridCacheVersion(0, 0, 0, 1, 0); + private GridCacheVersion ver = new GridCacheVersion(0, 0, 1, 0); /** Obsolete version. */ private GridCacheVersion obsoleteVer = ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionSelfTest.java index 8179dc5..475ccb5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionSelfTest.java @@ -79,7 +79,7 @@ public class GridCacheVersionSelfTest extends GridCommonAbstractTest { */ public void testMarshalling() throws Exception { GridCacheVersion ver = version(1, 1); - GridCacheVersionEx verEx = new GridCacheVersionEx(2, 2, 0, 0, ver); + GridCacheVersionEx verEx = new GridCacheVersionEx(2, 2, 0, ver); OptimizedMarshaller marsh = new OptimizedMarshaller(false); @@ -101,6 +101,6 @@ public class GridCacheVersionSelfTest extends GridCommonAbstractTest { * @return Cache version. */ private GridCacheVersion version(int nodeOrder, int drId) { - return new GridCacheVersion(0, 0, 0, nodeOrder, drId); + return new GridCacheVersion(0, 0, nodeOrder, drId); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java index b0a407c..3d1064c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java @@ -38,22 +38,22 @@ import static org.apache.ignite.internal.processors.cache.transactions.TxDeadloc */ public class DepthFirstSearchTest extends TestCase { /** Tx 1. */ - private static final GridCacheVersion T1 = new GridCacheVersion(1, 0, 0, 0); + private static final GridCacheVersion T1 = new GridCacheVersion(1, 0, 0); /** Tx 2. */ - private static final GridCacheVersion T2 = new GridCacheVersion(2, 0, 0, 0); + private static final GridCacheVersion T2 = new GridCacheVersion(2, 0, 0); /** Tx 3. */ - private static final GridCacheVersion T3 = new GridCacheVersion(3, 0, 0, 0); + private static final GridCacheVersion T3 = new GridCacheVersion(3, 0, 0); /** Tx 4. */ - private static final GridCacheVersion T4 = new GridCacheVersion(4, 0, 0, 0); + private static final GridCacheVersion T4 = new GridCacheVersion(4, 0, 0); /** Tx 5. */ - private static final GridCacheVersion T5 = new GridCacheVersion(5, 0, 0, 0); + private static final GridCacheVersion T5 = new GridCacheVersion(5, 0, 0); /** Tx 6. */ - private static final GridCacheVersion T6 = new GridCacheVersion(6, 0, 0, 0); + private static final GridCacheVersion T6 = new GridCacheVersion(6, 0, 0); /** All transactions. */ private static final List<GridCacheVersion> ALL = Arrays.asList(T1, T2, T3, T4, T5, T6); @@ -291,19 +291,19 @@ public class DepthFirstSearchTest extends TestCase { int n = rnd.nextInt(nodesCnt); if (n != j) { - waitForNodes.add(new GridCacheVersion(n, 0, 0, 0)); + waitForNodes.add(new GridCacheVersion(n, 0, 0)); k++; } } } - wfg.put(new GridCacheVersion(j, 0, 0, 0), waitForNodes); + wfg.put(new GridCacheVersion(j, 0, 0), waitForNodes); } } for (int j = 0; j < nodesCnt; j++) { try { - List<GridCacheVersion> cycle = findCycle(wfg, new GridCacheVersion(j, 0, 0, 0)); + List<GridCacheVersion> cycle = findCycle(wfg, new GridCacheVersion(j, 0, 0)); if (cycle == null) cyclesNotFound++; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f858129/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java deleted file mode 100644 index f5ba07d..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.util.NavigableMap; -import org.apache.ignite.Ignite; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.GridKernalContextImpl; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.PA; -import org.apache.ignite.lifecycle.LifecycleBean; -import org.apache.ignite.lifecycle.LifecycleEventType; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Time sync processor self test. - */ -public class GridTimeSyncProcessorSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of grids in test. */ - public static final int GRID_CNT = 4; - - /** Starting grid index. */ - private int idx; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setLifecycleBeans(new TimeShiftLifecycleBean(idx * 2000)); - - idx++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testTimeSync() throws Exception { - startGrids(GRID_CNT); - - try { - // Check coordinator time deltas. - final IgniteKernal kernal = (IgniteKernal)grid(0); - - // Wait for latest time sync history. - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> hist = kernal.context().clockSync() - .timeSyncHistory(); - - info("Checking time sync history: " + hist); - - for (GridClockDeltaVersion ver : hist.keySet()) { - if (ver.topologyVersion() == 4) - return true; - } - - return false; - } - }, 10000); - - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> history = kernal.context().clockSync() - .timeSyncHistory(); - - GridClockDeltaSnapshot snap = history.lastEntry().getValue(); - - assertEquals(3, snap.deltas().size()); - - for (int i = 1; i < GRID_CNT; i++) { - Long delta = snap.deltas().get(grid(i).localNode().id()); - - // Give 300ms range for test? - int idealDelta = - i * 2000; - - int threshold = 100; - - assertTrue("Invalid time delta for node [expected=" + idealDelta + ", " + - "actual=" + delta + ", threshold=" + threshold, - delta <= idealDelta + threshold && delta >= idealDelta - threshold); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTimeSyncChangeCoordinator() throws Exception { - startGrids(GRID_CNT); - - try { - for (int i = 0; i < GRID_CNT; i++) { - // Not coordinator now. - stopGrid(i); - - startGrid(i); - } - - // Check coordinator time deltas. - final IgniteKernal kernal = (IgniteKernal)grid(0); - - assertEquals(6, kernal.localNode().order()); - - // Wait for latest time sync history. - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> hist = kernal.context().clockSync() - .timeSyncHistory(); - - info("Checking time sync history: " + hist); - - for (GridClockDeltaVersion ver : hist.keySet()) { - if (ver.topologyVersion() == 12) - return true; - } - - return false; - } - }, 10000); - - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> history = kernal.context().clockSync() - .timeSyncHistory(); - - GridClockDeltaSnapshot snap = history.lastEntry().getValue(); - - assertEquals(3, snap.deltas().size()); - - for (int i = 1; i < GRID_CNT; i++) { - Long delta = snap.deltas().get(grid(i).localNode().id()); - - // Give 300ms range for test? - int idealDelta = - i * 2000; - - int threshold = 100; - - assertTrue("Invalid time delta for node [expected=" + idealDelta + ", " + - "actual=" + delta + ", threshold=" + threshold, - delta <= idealDelta + threshold && delta >= idealDelta - threshold); - } - } - finally { - stopAllGrids(); - } - } - - /** - * Time bean that sets shifted time source to context. - */ - private static class TimeShiftLifecycleBean implements LifecycleBean { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Time delta. */ - private int delta; - - /** - * Constructs lifecycle bean. - * - * @param delta Time delta. - */ - private TimeShiftLifecycleBean(int delta) { - this.delta = delta; - } - - /** {@inheritDoc} */ - @Override public void onLifecycleEvent(LifecycleEventType evt) { - if (evt == LifecycleEventType.BEFORE_NODE_START) - ((GridKernalContextImpl)((IgniteKernal)g).context()).timeSource(new TimeShiftClockSource(delta)); - } - } - - /** - * Time shift time source. - */ - private static class TimeShiftClockSource implements GridClockSource { - /** Time shift delta. */ - private int delta; - - /** - * @param delta Time shift delta. - */ - private TimeShiftClockSource(int delta) { - this.delta = delta; - } - - /** {@inheritDoc} */ - @Override public long currentTimeMillis() { - return System.currentTimeMillis() + delta; - } - } -} \ No newline at end of file
