Remove GridClockSyncProcessor, GridTimeSyncProcessorSelfTest and related.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/decc8021 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/decc8021 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/decc8021 Branch: refs/heads/ignite-4587 Commit: decc8021f7ae5c0073f8cf15c7ca64a57305aa3b Parents: 6265f33 Author: Max Kozlov <[email protected]> Authored: Thu Mar 9 14:34:40 2017 +0300 Committer: Max Kozlov <[email protected]> Committed: Thu Mar 9 14:34:40 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 8 - .../ignite/internal/GridKernalContextImpl.java | 12 - .../apache/ignite/internal/IgniteKernal.java | 2 - .../cache/version/GridCacheVersionManager.java | 3 +- .../processors/clock/GridClockServer.java | 6 - .../clock/GridClockSyncProcessor.java | 481 ------------------- .../clock/GridTimeSyncProcessorSelfTest.java | 224 --------- 7 files changed, 2 insertions(+), 734 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 00696c7..1c39f9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.clock.GridClockSource; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -178,13 +177,6 @@ public interface GridKernalContext extends Iterable<GridComponent> { public GridTimeoutProcessor timeout(); /** - * Gets time processor. - * - * @return Time processor. - */ - public GridClockSyncProcessor clockSync(); - - /** * Gets resource processor. * * @return Resource processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index e80ec6b..24bb97a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.clock.GridClockSource; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.clock.GridJvmClockSource; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -186,10 +185,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private GridClockSyncProcessor clockSyncProc; - - /** */ - @GridToStringInclude private GridResourceProcessor rsrcProc; /** */ @@ -514,8 +509,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable jobProc = (GridJobProcessor)comp; else if (comp instanceof GridTimeoutProcessor) timeProc = (GridTimeoutProcessor)comp; - else if (comp instanceof GridClockSyncProcessor) - clockSyncProc = (GridClockSyncProcessor)comp; else if (comp instanceof GridResourceProcessor) rsrcProc = (GridResourceProcessor)comp; else if (comp instanceof GridJobMetricsProcessor) @@ -637,11 +630,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public GridClockSyncProcessor clockSync() { - return clockSyncProc; - } - - /** {@inheritDoc} */ @Override public GridResourceProcessor resource() { return rsrcProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index cdbe2e3..77ba212 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -108,7 +108,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -891,7 +890,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); - startProcessor(new GridClockSyncProcessor(ctx)); startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 5a8904f..9e5e37e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -271,7 +271,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); - long globalTime = cctx.kernalContext().clockSync().adjustedTime(topVer); +// long globalTime = cctx.kernalContext().clockSync().adjustedTime(topVer); + long globalTime = 0; if (addTime) { if (gridStartTime == 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java index 8daef31..a736a37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java @@ -47,9 +47,6 @@ public class GridClockServer { /** Read worker. */ private GridWorker readWorker; - /** Instance of time processor. */ - private GridClockSyncProcessor clockSync; - /** * Starts server. * @@ -59,7 +56,6 @@ public class GridClockServer { public void start(GridKernalContext ctx) throws IgniteCheckedException { this.ctx = ctx; - clockSync = ctx.clockSync(); log = ctx.log(GridClockServer.class); try { @@ -205,8 +201,6 @@ public class GridClockServer { GridClockMessage msg = GridClockMessage.fromBytes(packet.getData(), packet.getOffset(), packet.getLength()); - - clockSync.onMessageReceived(msg, packet.getAddress(), packet.getPort()); } catch (IgniteCheckedException e) { U.warn(log, "Failed to assemble clock server message (will ignore the packet) [host=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java deleted file mode 100644 index 257d0d9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.net.InetAddress; -import java.util.Collection; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_TIME_SYNC; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_HOST; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_PORT; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; - -/** - * Time synchronization processor. - */ -public class GridClockSyncProcessor extends GridProcessorAdapter { - /** Maximum size for time sync history. */ - private static final int MAX_TIME_SYNC_HISTORY = 100; - - /** Time server instance. */ - private GridClockServer srv; - - /** Shutdown lock. */ - private GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Time coordinator thread. */ - private volatile TimeCoordinator timeCoord; - - /** Time delta history. Constructed on coordinator. */ - private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist = - new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY); - - /** Last recorded. */ - private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot; - - /** Time source. */ - private GridClockSource clockSrc; - - /** - * @param ctx Kernal context. - */ - public GridClockSyncProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - clockSrc = ctx.timeSource(); - - srv = new GridClockServer(); - - srv.start(ctx); - - ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridClockDeltaSnapshotMessage; - - GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg; - - GridClockDeltaVersion ver = msg0.snapshotVersion(); - - GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas()); - - lastSnapshot = new T2<>(ver, snap); - - timeSyncHist.put(ver, snap); - } - }); - - // We care only about node leave and fail events. - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) - checkLaunchCoordinator(discoEvt); - - TimeCoordinator timeCoord0 = timeCoord; - - if (timeCoord0 != null) - timeCoord0.onDiscoveryEvent(discoEvt); - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED); - - ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host()); - ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - srv.afterStart(); - - // Check at startup if this node is a fragmentizer coordinator. - DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent(); - - checkLaunchCoordinator(locJoinEvt); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - rw.writeLock(); - - try { - stopping = false; - - if (timeCoord != null) { - timeCoord.cancel(); - - U.join(timeCoord, log); - - timeCoord = null; - } - - if (srv != null) - srv.beforeStop(); - } - finally { - rw.writeUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (srv != null) - srv.stop(); - } - - /** - * Gets current time on local node. - * - * @return Current time in milliseconds. - */ - private long currentTime() { - return clockSrc.currentTimeMillis(); - } - - /** - * @return Time sync history. - */ - public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHistory() { - return timeSyncHist; - } - - /** - * Callback from server for message receiving. - * - * @param msg Received message. - * @param addr Remote node address. - * @param port Remote node port. - */ - public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) { - long rcvTs = currentTime(); - - if (!msg.originatingNodeId().equals(ctx.localNodeId())) { - // We received time request from remote node, set current time and reply back. - msg.replyTimestamp(rcvTs); - - try { - srv.sendPacket(msg, addr, port); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send time server reply to remote node: " + msg, e); - } - } - else - timeCoord.onMessage(msg, rcvTs); - } - - /** - * Checks if local node is the oldest node in topology and starts time coordinator if so. - * - * @param discoEvt Discovery event. - */ - private void checkLaunchCoordinator(DiscoveryEvent discoEvt) { - rw.readLock(); - - try { - if (stopping) - return; - - if (timeCoord == null) { - long minNodeOrder = Long.MAX_VALUE; - - Collection<ClusterNode> nodes = discoEvt.topologyNodes(); - - for (ClusterNode node : nodes) { - if (node.order() < minNodeOrder) - minNodeOrder = node.order(); - } - - ClusterNode locNode = ctx.discovery().localNode(); - - if (locNode.order() == minNodeOrder) { - if (log.isDebugEnabled()) - log.debug("Detected local node to be the eldest node in topology, starting time " + - "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']'); - - synchronized (this) { - if (timeCoord == null && !stopping) { - timeCoord = new TimeCoordinator(discoEvt); - - IgniteThread th = new IgniteThread(timeCoord); - - th.setPriority(Thread.MAX_PRIORITY); - - th.start(); - } - } - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Gets time adjusted with time coordinator on given topology version. - * - * @param topVer Topology version. - * @return Adjusted time. - */ - public long adjustedTime(long topVer) { - T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot; - - GridClockDeltaSnapshot snap; - - if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer) - snap = fastSnap.get2(); - else { - // Get last synchronized time on given topology version. - Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry( - new GridClockDeltaVersion(0, topVer + 1)); - - snap = entry == null ? null : entry.getValue(); - } - - long now = clockSrc.currentTimeMillis(); - - if (snap == null) - return now; - - Long delta = snap.deltas().get(ctx.localNodeId()); - - if (delta == null) - delta = 0L; - - return now + delta; - } - - /** - * Publishes snapshot to topology. - * - * @param snapshot Snapshot to publish. - * @param top Topology to send given snapshot to. - */ - private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) { - if (!rw.tryReadLock()) - return; - - try { - lastSnapshot = new T2<>(snapshot.version(), snapshot); - - timeSyncHist.put(snapshot.version(), snapshot); - - for (ClusterNode n : top.topologyNodes()) { - GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage( - snapshot.version(), snapshot.deltas()); - - try { - ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().pingNodeNoError(n.id())) - U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) " + - "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } - finally { - rw.readUnlock(); - } - } - - /** - * Time coordinator thread. - */ - private class TimeCoordinator extends GridWorker { - /** Last discovery topology snapshot. */ - private volatile GridDiscoveryTopologySnapshot lastSnapshot; - - /** Snapshot being constructed. May be not null only on coordinator node. */ - private volatile GridClockDeltaSnapshot pendingSnapshot; - - /** Version counter. */ - private long verCnt = 1; - - /** - * Time coordinator thread constructor. - * - * @param evt Discovery event on which this node became a coordinator. - */ - protected TimeCoordinator(DiscoveryEvent evt) { - super(ctx.gridName(), "grid-time-coordinator", GridClockSyncProcessor.this.log); - - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - GridDiscoveryTopologySnapshot top = lastSnapshot; - - if (log.isDebugEnabled()) - log.debug("Creating time sync snapshot for topology: " + top); - - GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot( - new GridClockDeltaVersion(verCnt++, top.topologyVersion()), - ctx.localNodeId(), - top, - ctx.config().getClockSyncSamples()); - - pendingSnapshot = snapshot; - - while (!snapshot.ready()) { - if (log.isDebugEnabled()) - log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds()); - - for (UUID nodeId : snapshot.pendingNodeIds()) - requestTime(nodeId); - - if (log.isDebugEnabled()) - log.debug("Waiting for snapshot to be ready: " + snapshot); - - // Wait for all replies - snapshot.awaitReady(1000); - } - - // No more messages should be processed. - pendingSnapshot = null; - - if (log.isDebugEnabled()) - log.debug("Collected time sync results: " + snapshot.deltas()); - - publish(snapshot, top); - - synchronized (this) { - if (top.topologyVersion() == lastSnapshot.topologyVersion()) - wait(ctx.config().getClockSyncFrequency()); - } - } - } - - /** - * @param evt Discovery event. - */ - public void onDiscoveryEvent(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event: " + evt); - - if (evt.type() == EventType.EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) - onNodeLeft(evt.eventNode().id()); - - synchronized (this) { - lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); - - notifyAll(); - } - } - - /** - * @param msg Message received from remote node. - * @param rcvTs Receive timestamp. - */ - private void onMessage(GridClockMessage msg, long rcvTs) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) { - long delta = (msg.originatingTimestamp() + rcvTs) / 2 - msg.replyTimestamp(); - - boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta); - - if (needMore) - requestTime(msg.targetNodeId()); - } - } - - /** - * Requests time from remote node. - * - * @param rmtNodeId Remote node ID. - */ - private void requestTime(UUID rmtNodeId) { - ClusterNode node = ctx.discovery().node(rmtNodeId); - - if (node != null) { - InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST); - int port = node.attribute(ATTR_TIME_SERVER_PORT); - - try { - GridClockMessage req = new GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0); - - srv.sendPacket(req, addr, port); - } - catch (IgniteCheckedException e) { - LT.error(log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId + - ", addr=" + addr + ", port=" + port + ']'); - } - } - else - onNodeLeft(rmtNodeId); - } - - /** - * Node left callback. - * - * @param nodeId Left node ID. - */ - private void onNodeLeft(UUID nodeId) { - GridClockDeltaSnapshot curr = pendingSnapshot; - - if (curr != null) - curr.onNodeLeft(nodeId); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java deleted file mode 100644 index f5ba07d..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.clock; - -import java.util.NavigableMap; -import org.apache.ignite.Ignite; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.GridKernalContextImpl; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.PA; -import org.apache.ignite.lifecycle.LifecycleBean; -import org.apache.ignite.lifecycle.LifecycleEventType; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Time sync processor self test. - */ -public class GridTimeSyncProcessorSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of grids in test. */ - public static final int GRID_CNT = 4; - - /** Starting grid index. */ - private int idx; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setLifecycleBeans(new TimeShiftLifecycleBean(idx * 2000)); - - idx++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testTimeSync() throws Exception { - startGrids(GRID_CNT); - - try { - // Check coordinator time deltas. - final IgniteKernal kernal = (IgniteKernal)grid(0); - - // Wait for latest time sync history. - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> hist = kernal.context().clockSync() - .timeSyncHistory(); - - info("Checking time sync history: " + hist); - - for (GridClockDeltaVersion ver : hist.keySet()) { - if (ver.topologyVersion() == 4) - return true; - } - - return false; - } - }, 10000); - - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> history = kernal.context().clockSync() - .timeSyncHistory(); - - GridClockDeltaSnapshot snap = history.lastEntry().getValue(); - - assertEquals(3, snap.deltas().size()); - - for (int i = 1; i < GRID_CNT; i++) { - Long delta = snap.deltas().get(grid(i).localNode().id()); - - // Give 300ms range for test? - int idealDelta = - i * 2000; - - int threshold = 100; - - assertTrue("Invalid time delta for node [expected=" + idealDelta + ", " + - "actual=" + delta + ", threshold=" + threshold, - delta <= idealDelta + threshold && delta >= idealDelta - threshold); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTimeSyncChangeCoordinator() throws Exception { - startGrids(GRID_CNT); - - try { - for (int i = 0; i < GRID_CNT; i++) { - // Not coordinator now. - stopGrid(i); - - startGrid(i); - } - - // Check coordinator time deltas. - final IgniteKernal kernal = (IgniteKernal)grid(0); - - assertEquals(6, kernal.localNode().order()); - - // Wait for latest time sync history. - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> hist = kernal.context().clockSync() - .timeSyncHistory(); - - info("Checking time sync history: " + hist); - - for (GridClockDeltaVersion ver : hist.keySet()) { - if (ver.topologyVersion() == 12) - return true; - } - - return false; - } - }, 10000); - - NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> history = kernal.context().clockSync() - .timeSyncHistory(); - - GridClockDeltaSnapshot snap = history.lastEntry().getValue(); - - assertEquals(3, snap.deltas().size()); - - for (int i = 1; i < GRID_CNT; i++) { - Long delta = snap.deltas().get(grid(i).localNode().id()); - - // Give 300ms range for test? - int idealDelta = - i * 2000; - - int threshold = 100; - - assertTrue("Invalid time delta for node [expected=" + idealDelta + ", " + - "actual=" + delta + ", threshold=" + threshold, - delta <= idealDelta + threshold && delta >= idealDelta - threshold); - } - } - finally { - stopAllGrids(); - } - } - - /** - * Time bean that sets shifted time source to context. - */ - private static class TimeShiftLifecycleBean implements LifecycleBean { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Time delta. */ - private int delta; - - /** - * Constructs lifecycle bean. - * - * @param delta Time delta. - */ - private TimeShiftLifecycleBean(int delta) { - this.delta = delta; - } - - /** {@inheritDoc} */ - @Override public void onLifecycleEvent(LifecycleEventType evt) { - if (evt == LifecycleEventType.BEFORE_NODE_START) - ((GridKernalContextImpl)((IgniteKernal)g).context()).timeSource(new TimeShiftClockSource(delta)); - } - } - - /** - * Time shift time source. - */ - private static class TimeShiftClockSource implements GridClockSource { - /** Time shift delta. */ - private int delta; - - /** - * @param delta Time shift delta. - */ - private TimeShiftClockSource(int delta) { - this.delta = delta; - } - - /** {@inheritDoc} */ - @Override public long currentTimeMillis() { - return System.currentTimeMillis() + delta; - } - } -} \ No newline at end of file
