Remove GridClockSyncProcessor, GridTimeSyncProcessorSelfTest and related.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/decc8021
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/decc8021
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/decc8021

Branch: refs/heads/ignite-4587
Commit: decc8021f7ae5c0073f8cf15c7ca64a57305aa3b
Parents: 6265f33
Author: Max Kozlov <[email protected]>
Authored: Thu Mar 9 14:34:40 2017 +0300
Committer: Max Kozlov <[email protected]>
Committed: Thu Mar 9 14:34:40 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |   8 -
 .../ignite/internal/GridKernalContextImpl.java  |  12 -
 .../apache/ignite/internal/IgniteKernal.java    |   2 -
 .../cache/version/GridCacheVersionManager.java  |   3 +-
 .../processors/clock/GridClockServer.java       |   6 -
 .../clock/GridClockSyncProcessor.java           | 481 -------------------
 .../clock/GridTimeSyncProcessorSelfTest.java    | 224 ---------
 7 files changed, 2 insertions(+), 734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 00696c7..1c39f9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -37,7 +37,6 @@ import 
org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.clock.GridClockSource;
-import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
 import 
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -178,13 +177,6 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public GridTimeoutProcessor timeout();
 
     /**
-     * Gets time processor.
-     *
-     * @return Time processor.
-     */
-    public GridClockSyncProcessor clockSync();
-
-    /**
      * Gets resource processor.
      *
      * @return Resource processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index e80ec6b..24bb97a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -52,7 +52,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.clock.GridClockSource;
-import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor;
 import org.apache.ignite.internal.processors.clock.GridJvmClockSource;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -186,10 +185,6 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringInclude
-    private GridClockSyncProcessor clockSyncProc;
-
-    /** */
-    @GridToStringInclude
     private GridResourceProcessor rsrcProc;
 
     /** */
@@ -514,8 +509,6 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
             jobProc = (GridJobProcessor)comp;
         else if (comp instanceof GridTimeoutProcessor)
             timeProc = (GridTimeoutProcessor)comp;
-        else if (comp instanceof GridClockSyncProcessor)
-            clockSyncProc = (GridClockSyncProcessor)comp;
         else if (comp instanceof GridResourceProcessor)
             rsrcProc = (GridResourceProcessor)comp;
         else if (comp instanceof GridJobMetricsProcessor)
@@ -637,11 +630,6 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public GridClockSyncProcessor clockSync() {
-        return clockSyncProc;
-    }
-
-    /** {@inheritDoc} */
     @Override public GridResourceProcessor resource() {
         return rsrcProc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index cdbe2e3..77ba212 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -108,7 +108,6 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
-import org.apache.ignite.internal.processors.clock.GridClockSyncProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
 import 
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -891,7 +890,6 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             // Start processors before discovery manager, so they will
             // be able to start receiving messages once discovery completes.
             
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
-            startProcessor(new GridClockSyncProcessor(ctx));
             startProcessor(new GridAffinityProcessor(ctx));
             startProcessor(createComponent(GridSegmentationProcessor.class, 
ctx));
             startProcessor(createComponent(IgniteCacheObjectProcessor.class, 
ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 5a8904f..9e5e37e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -271,7 +271,8 @@ public class GridCacheVersionManager extends 
GridCacheSharedManagerAdapter {
         if (topVer == -1)
             topVer = cctx.kernalContext().discovery().topologyVersion();
 
-        long globalTime = 
cctx.kernalContext().clockSync().adjustedTime(topVer);
+//        long globalTime = 
cctx.kernalContext().clockSync().adjustedTime(topVer);
+        long globalTime = 0;
 
         if (addTime) {
             if (gridStartTime == 0)

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
index 8daef31..a736a37 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
@@ -47,9 +47,6 @@ public class GridClockServer {
     /** Read worker. */
     private GridWorker readWorker;
 
-    /** Instance of time processor. */
-    private GridClockSyncProcessor clockSync;
-
     /**
      * Starts server.
      *
@@ -59,7 +56,6 @@ public class GridClockServer {
     public void start(GridKernalContext ctx) throws IgniteCheckedException {
         this.ctx = ctx;
 
-        clockSync = ctx.clockSync();
         log = ctx.log(GridClockServer.class);
 
         try {
@@ -205,8 +201,6 @@ public class GridClockServer {
 
                     GridClockMessage msg = 
GridClockMessage.fromBytes(packet.getData(), packet.getOffset(),
                         packet.getLength());
-
-                    clockSync.onMessageReceived(msg, packet.getAddress(), 
packet.getPort());
                 }
                 catch (IgniteCheckedException e) {
                     U.warn(log, "Failed to assemble clock server message (will 
ignore the packet) [host=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
deleted file mode 100644
index 257d0d9..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import 
org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_TIME_SYNC;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_HOST;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_PORT;
-import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-
-/**
- * Time synchronization processor.
- */
-public class GridClockSyncProcessor extends GridProcessorAdapter {
-    /** Maximum size for time sync history. */
-    private static final int MAX_TIME_SYNC_HISTORY = 100;
-
-    /** Time server instance. */
-    private GridClockServer srv;
-
-    /** Shutdown lock. */
-    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
-
-    /** Stopping flag. */
-    private volatile boolean stopping;
-
-    /** Time coordinator thread. */
-    private volatile TimeCoordinator timeCoord;
-
-    /** Time delta history. Constructed on coordinator. */
-    private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
timeSyncHist =
-        new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
-
-    /** Last recorded. */
-    private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> 
lastSnapshot;
-
-    /** Time source. */
-    private GridClockSource clockSrc;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public GridClockSyncProcessor(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        clockSrc = ctx.timeSource();
-
-        srv = new GridClockServer();
-
-        srv.start(ctx);
-
-        ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() 
{
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridClockDeltaSnapshotMessage;
-
-                GridClockDeltaSnapshotMessage msg0 = 
(GridClockDeltaSnapshotMessage)msg;
-
-                GridClockDeltaVersion ver = msg0.snapshotVersion();
-
-                GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, 
msg0.deltas());
-
-                lastSnapshot = new T2<>(ver, snap);
-
-                timeSyncHist.put(ver, snap);
-            }
-        });
-
-        // We care only about node leave and fail events.
-        ctx.event().addLocalEventListener(new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED;
-
-                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-                if (evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED)
-                    checkLaunchCoordinator(discoEvt);
-
-                TimeCoordinator timeCoord0 = timeCoord;
-
-                if (timeCoord0 != null)
-                    timeCoord0.onDiscoveryEvent(discoEvt);
-            }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED);
-
-        ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host());
-        ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        srv.afterStart();
-
-        // Check at startup if this node is a fragmentizer coordinator.
-        DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent();
-
-        checkLaunchCoordinator(locJoinEvt);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        rw.writeLock();
-
-        try {
-            stopping = false;
-
-            if (timeCoord != null) {
-                timeCoord.cancel();
-
-                U.join(timeCoord, log);
-
-                timeCoord = null;
-            }
-
-            if (srv != null)
-                srv.beforeStop();
-        }
-        finally {
-            rw.writeUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
-
-        if (srv != null)
-            srv.stop();
-    }
-
-    /**
-     * Gets current time on local node.
-     *
-     * @return Current time in milliseconds.
-     */
-    private long currentTime() {
-        return clockSrc.currentTimeMillis();
-    }
-
-    /**
-     * @return Time sync history.
-     */
-    public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
timeSyncHistory() {
-        return timeSyncHist;
-    }
-
-    /**
-     * Callback from server for message receiving.
-     *
-     * @param msg Received message.
-     * @param addr Remote node address.
-     * @param port Remote node port.
-     */
-    public void onMessageReceived(GridClockMessage msg, InetAddress addr, int 
port) {
-        long rcvTs = currentTime();
-
-        if (!msg.originatingNodeId().equals(ctx.localNodeId())) {
-            // We received time request from remote node, set current time and 
reply back.
-            msg.replyTimestamp(rcvTs);
-
-            try {
-                srv.sendPacket(msg, addr, port);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send time server reply to remote node: 
" + msg, e);
-            }
-        }
-        else
-            timeCoord.onMessage(msg, rcvTs);
-    }
-
-    /**
-     * Checks if local node is the oldest node in topology and starts time 
coordinator if so.
-     *
-     * @param discoEvt Discovery event.
-     */
-    private void checkLaunchCoordinator(DiscoveryEvent discoEvt) {
-        rw.readLock();
-
-        try {
-            if (stopping)
-                return;
-
-            if (timeCoord == null) {
-                long minNodeOrder = Long.MAX_VALUE;
-
-                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
-
-                for (ClusterNode node : nodes) {
-                    if (node.order() < minNodeOrder)
-                        minNodeOrder = node.order();
-                }
-
-                ClusterNode locNode = ctx.discovery().localNode();
-
-                if (locNode.order() == minNodeOrder) {
-                    if (log.isDebugEnabled())
-                        log.debug("Detected local node to be the eldest node 
in topology, starting time " +
-                            "coordinator thread [discoEvt=" + discoEvt + ", 
locNode=" + locNode + ']');
-
-                    synchronized (this) {
-                        if (timeCoord == null && !stopping) {
-                            timeCoord = new TimeCoordinator(discoEvt);
-
-                            IgniteThread th = new IgniteThread(timeCoord);
-
-                            th.setPriority(Thread.MAX_PRIORITY);
-
-                            th.start();
-                        }
-                    }
-                }
-            }
-        }
-        finally {
-            rw.readUnlock();
-        }
-    }
-
-    /**
-     * Gets time adjusted with time coordinator on given topology version.
-     *
-     * @param topVer Topology version.
-     * @return Adjusted time.
-     */
-    public long adjustedTime(long topVer) {
-        T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = 
lastSnapshot;
-
-        GridClockDeltaSnapshot snap;
-
-        if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
-            snap = fastSnap.get2();
-        else {
-            // Get last synchronized time on given topology version.
-            Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = 
timeSyncHistory().lowerEntry(
-                new GridClockDeltaVersion(0, topVer + 1));
-
-            snap = entry == null ? null : entry.getValue();
-        }
-
-        long now = clockSrc.currentTimeMillis();
-
-        if (snap == null)
-            return now;
-
-        Long delta = snap.deltas().get(ctx.localNodeId());
-
-        if (delta == null)
-            delta = 0L;
-
-        return now + delta;
-    }
-
-    /**
-     * Publishes snapshot to topology.
-     *
-     * @param snapshot Snapshot to publish.
-     * @param top Topology to send given snapshot to.
-     */
-    private void publish(GridClockDeltaSnapshot snapshot, 
GridDiscoveryTopologySnapshot top) {
-        if (!rw.tryReadLock())
-            return;
-
-        try {
-            lastSnapshot = new T2<>(snapshot.version(), snapshot);
-
-            timeSyncHist.put(snapshot.version(), snapshot);
-
-            for (ClusterNode n : top.topologyNodes()) {
-                GridClockDeltaSnapshotMessage msg = new 
GridClockDeltaSnapshotMessage(
-                    snapshot.version(), snapshot.deltas());
-
-                try {
-                    ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    if (ctx.discovery().pingNodeNoError(n.id()))
-                        U.error(log, "Failed to send time sync snapshot to 
remote node (did not leave grid?) " +
-                            "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + 
e.getMessage() + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Failed to send time sync snapshot to remote 
node (did not leave grid?) " +
-                            "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + 
e.getMessage() + ']');
-                }
-            }
-        }
-        finally {
-            rw.readUnlock();
-        }
-    }
-
-    /**
-     * Time coordinator thread.
-     */
-    private class TimeCoordinator extends GridWorker {
-        /** Last discovery topology snapshot. */
-        private volatile GridDiscoveryTopologySnapshot lastSnapshot;
-
-        /** Snapshot being constructed. May be not null only on coordinator 
node. */
-        private volatile GridClockDeltaSnapshot pendingSnapshot;
-
-        /** Version counter. */
-        private long verCnt = 1;
-
-        /**
-         * Time coordinator thread constructor.
-         *
-         * @param evt Discovery event on which this node became a coordinator.
-         */
-        protected TimeCoordinator(DiscoveryEvent evt) {
-            super(ctx.gridName(), "grid-time-coordinator", 
GridClockSyncProcessor.this.log);
-
-            lastSnapshot = new 
GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                GridDiscoveryTopologySnapshot top = lastSnapshot;
-
-                if (log.isDebugEnabled())
-                    log.debug("Creating time sync snapshot for topology: " + 
top);
-
-                GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot(
-                    new GridClockDeltaVersion(verCnt++, top.topologyVersion()),
-                    ctx.localNodeId(),
-                    top,
-                    ctx.config().getClockSyncSamples());
-
-                pendingSnapshot = snapshot;
-
-                while (!snapshot.ready()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Requesting time from remote nodes: " + 
snapshot.pendingNodeIds());
-
-                    for (UUID nodeId : snapshot.pendingNodeIds())
-                        requestTime(nodeId);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for snapshot to be ready: " + 
snapshot);
-
-                    // Wait for all replies
-                    snapshot.awaitReady(1000);
-                }
-
-                // No more messages should be processed.
-                pendingSnapshot = null;
-
-                if (log.isDebugEnabled())
-                    log.debug("Collected time sync results: " + 
snapshot.deltas());
-
-                publish(snapshot, top);
-
-                synchronized (this) {
-                    if (top.topologyVersion() == 
lastSnapshot.topologyVersion())
-                        wait(ctx.config().getClockSyncFrequency());
-                }
-            }
-        }
-
-        /**
-         * @param evt Discovery event.
-         */
-        public void onDiscoveryEvent(DiscoveryEvent evt) {
-            if (log.isDebugEnabled())
-                log.debug("Processing discovery event: " + evt);
-
-            if (evt.type() == EventType.EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT)
-                onNodeLeft(evt.eventNode().id());
-
-            synchronized (this) {
-                lastSnapshot = new 
GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
-
-                notifyAll();
-            }
-        }
-
-        /**
-         * @param msg Message received from remote node.
-         * @param rcvTs Receive timestamp.
-         */
-        private void onMessage(GridClockMessage msg, long rcvTs) {
-            GridClockDeltaSnapshot curr = pendingSnapshot;
-
-            if (curr != null) {
-                long delta = (msg.originatingTimestamp() + rcvTs) / 2 - 
msg.replyTimestamp();
-
-                boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), 
delta);
-
-                if (needMore)
-                    requestTime(msg.targetNodeId());
-            }
-        }
-
-        /**
-         * Requests time from remote node.
-         *
-         * @param rmtNodeId Remote node ID.
-         */
-        private void requestTime(UUID rmtNodeId) {
-            ClusterNode node = ctx.discovery().node(rmtNodeId);
-
-            if (node != null) {
-                InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST);
-                int port = node.attribute(ATTR_TIME_SERVER_PORT);
-
-                try {
-                    GridClockMessage req = new 
GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0);
-
-                    srv.sendPacket(req, addr, port);
-                }
-                catch (IgniteCheckedException e) {
-                    LT.error(log, e, "Failed to send time request to remote 
node [rmtNodeId=" + rmtNodeId +
-                        ", addr=" + addr + ", port=" + port + ']');
-                }
-            }
-            else
-                onNodeLeft(rmtNodeId);
-        }
-
-        /**
-         * Node left callback.
-         *
-         * @param nodeId Left node ID.
-         */
-        private void onNodeLeft(UUID nodeId) {
-            GridClockDeltaSnapshot curr = pendingSnapshot;
-
-            if (curr != null)
-                curr.onNodeLeft(nodeId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/decc8021/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java
deleted file mode 100644
index f5ba07d..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/clock/GridTimeSyncProcessorSelfTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.util.NavigableMap;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridKernalContextImpl;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.PA;
-import org.apache.ignite.lifecycle.LifecycleBean;
-import org.apache.ignite.lifecycle.LifecycleEventType;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Time sync processor self test.
- */
-public class GridTimeSyncProcessorSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Number of grids in test. */
-    public static final int GRID_CNT = 4;
-
-    /** Starting grid index. */
-    private int idx;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setLifecycleBeans(new TimeShiftLifecycleBean(idx * 2000));
-
-        idx++;
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTimeSync() throws Exception {
-        startGrids(GRID_CNT);
-
-        try {
-            // Check coordinator time deltas.
-            final IgniteKernal kernal = (IgniteKernal)grid(0);
-
-            // Wait for latest time sync history.
-            GridTestUtils.waitForCondition(new PA() {
-                @Override public boolean apply() {
-                    NavigableMap<GridClockDeltaVersion, 
GridClockDeltaSnapshot> hist = kernal.context().clockSync()
-                        .timeSyncHistory();
-
-                    info("Checking time sync history: " + hist);
-
-                    for (GridClockDeltaVersion ver : hist.keySet()) {
-                        if (ver.topologyVersion() == 4)
-                            return true;
-                    }
-
-                    return false;
-                }
-            }, 10000);
-
-            NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
history = kernal.context().clockSync()
-                .timeSyncHistory();
-
-            GridClockDeltaSnapshot snap = history.lastEntry().getValue();
-
-            assertEquals(3, snap.deltas().size());
-
-            for (int i = 1; i < GRID_CNT; i++) {
-                Long delta = snap.deltas().get(grid(i).localNode().id());
-
-                // Give 300ms range for test?
-                int idealDelta = - i * 2000;
-
-                int threshold = 100;
-
-                assertTrue("Invalid time delta for node [expected=" + 
idealDelta + ", " +
-                    "actual=" + delta + ", threshold=" + threshold,
-                    delta <= idealDelta + threshold && delta >= idealDelta - 
threshold);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTimeSyncChangeCoordinator() throws Exception {
-        startGrids(GRID_CNT);
-
-        try {
-            for (int i = 0; i < GRID_CNT; i++) {
-                // Not coordinator now.
-                stopGrid(i);
-
-                startGrid(i);
-            }
-
-            // Check coordinator time deltas.
-            final IgniteKernal kernal = (IgniteKernal)grid(0);
-
-            assertEquals(6, kernal.localNode().order());
-
-            // Wait for latest time sync history.
-            GridTestUtils.waitForCondition(new PA() {
-                @Override public boolean apply() {
-                    NavigableMap<GridClockDeltaVersion, 
GridClockDeltaSnapshot> hist = kernal.context().clockSync()
-                        .timeSyncHistory();
-
-                    info("Checking time sync history: " + hist);
-
-                    for (GridClockDeltaVersion ver : hist.keySet()) {
-                        if (ver.topologyVersion() == 12)
-                            return true;
-                    }
-
-                    return false;
-                }
-            }, 10000);
-
-            NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
history = kernal.context().clockSync()
-                .timeSyncHistory();
-
-            GridClockDeltaSnapshot snap = history.lastEntry().getValue();
-
-            assertEquals(3, snap.deltas().size());
-
-            for (int i = 1; i < GRID_CNT; i++) {
-                Long delta = snap.deltas().get(grid(i).localNode().id());
-
-                // Give 300ms range for test?
-                int idealDelta = - i * 2000;
-
-                int threshold = 100;
-
-                assertTrue("Invalid time delta for node [expected=" + 
idealDelta + ", " +
-                    "actual=" + delta + ", threshold=" + threshold,
-                    delta <= idealDelta + threshold && delta >= idealDelta - 
threshold);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Time bean that sets shifted time source to context.
-     */
-    private static class TimeShiftLifecycleBean implements LifecycleBean {
-        /** Injected grid. */
-        @IgniteInstanceResource
-        private Ignite g;
-
-        /** Time delta. */
-        private int delta;
-
-        /**
-         * Constructs lifecycle bean.
-         *
-         * @param delta Time delta.
-         */
-        private TimeShiftLifecycleBean(int delta) {
-            this.delta = delta;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onLifecycleEvent(LifecycleEventType evt) {
-            if (evt == LifecycleEventType.BEFORE_NODE_START)
-                
((GridKernalContextImpl)((IgniteKernal)g).context()).timeSource(new 
TimeShiftClockSource(delta));
-        }
-    }
-
-    /**
-     * Time shift time source.
-     */
-    private static class TimeShiftClockSource implements GridClockSource {
-        /** Time shift delta. */
-        private int delta;
-
-        /**
-         * @param delta Time shift delta.
-         */
-        private TimeShiftClockSource(int delta) {
-            this.delta = delta;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long currentTimeMillis() {
-            return System.currentTimeMillis() + delta;
-        }
-    }
-}
\ No newline at end of file

Reply via email to