# ignite-63

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c602549a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c602549a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c602549a

Branch: refs/heads/ignite-63
Commit: c602549aceca0fedc33abfd12cd40fc552f097d2
Parents: cfcf46d
Author: sboikov <[email protected]>
Authored: Fri Jan 23 00:05:03 2015 +0300
Committer: sboikov <[email protected]>
Committed: Fri Jan 23 00:05:07 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/internal/GridEx.java |   2 +-
 .../org/apache/ignite/internal/GridKernal.java  |   5 +-
 .../ignite/internal/GridKernalContext.java      |   5 +-
 .../ignite/internal/GridKernalContextImpl.java  |   5 +-
 .../ignite/internal/GridPortablesImpl.java      |   2 -
 .../clock/GridClockDeltaSnapshot.java           | 231 ++++++++++
 .../clock/GridClockDeltaSnapshotMessage.java    | 226 +++++++++
 .../processors/clock/GridClockDeltaVersion.java | 117 +++++
 .../processors/clock/GridClockMessage.java      | 171 +++++++
 .../processors/clock/GridClockServer.java       | 206 +++++++++
 .../processors/clock/GridClockSource.java       |  30 ++
 .../clock/GridClockSyncProcessor.java           | 458 +++++++++++++++++++
 .../processors/clock/GridJvmClockSource.java    |  28 ++
 .../dr/GridDrDataLoadCacheUpdater.java          |  77 ++++
 .../internal/processors/dr/GridDrType.java      |  38 ++
 .../processors/dr/GridRawVersionedEntry.java    | 210 +++++++++
 .../processors/dr/GridVersionedEntry.java       |  80 ++++
 .../ignite/internal/processors/dr/package.html  |  25 +
 .../processors/interop/GridInteropAware.java    |  49 ++
 .../interop/GridInteropProcessor.java           |  82 ++++
 .../interop/GridInteropProcessorAdapter.java    |  31 ++
 .../processors/interop/GridInteropTarget.java   | 108 +++++
 .../interop/os/GridOsInteropProcessor.java      |  80 ++++
 .../internal/processors/interop/os/package.html |  23 +
 .../internal/processors/interop/package.html    |  23 +
 .../portable/GridPortableInputStream.java       | 177 +++++++
 .../portable/GridPortableOutputStream.java      | 172 +++++++
 .../portable/GridPortableProcessor.java         | 150 ++++++
 .../processors/portable/GridPortableStream.java |  53 +++
 .../portable/os/GridOsPortableProcessor.java    | 125 +++++
 .../processors/portable/os/package.html         |  23 +
 .../internal/processors/portable/package.html   |  23 +
 .../GridTcpCommunicationMessageAdapter.java     |   2 +-
 .../GridTcpCommunicationMessageFactory.java     |   2 +-
 .../GridTcpCommunicationMessageState.java       |   2 +-
 .../util/portable/PortableRawWriterEx.java      |   1 -
 .../processors/cache/GridCacheAdapter.java      |   4 +-
 .../processors/cache/GridCacheContext.java      |   1 -
 .../processors/cache/GridCacheEntryEx.java      |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../processors/cache/GridCacheStoreManager.java |   2 +-
 .../cache/GridCacheWriteBehindStore.java        |   2 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   2 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   2 +-
 .../preloader/GridDhtPartitionDemandPool.java   |   2 +-
 .../distributed/near/GridNearAtomicCache.java   |   2 +-
 .../processors/cache/dr/GridCacheDrManager.java |   2 +-
 .../cache/dr/os/GridOsCacheDrManager.java       |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      |   4 +-
 .../clock/GridClockDeltaSnapshot.java           | 231 ----------
 .../clock/GridClockDeltaSnapshotMessage.java    | 226 ---------
 .../processors/clock/GridClockDeltaVersion.java | 117 -----
 .../processors/clock/GridClockMessage.java      | 171 -------
 .../processors/clock/GridClockServer.java       | 206 ---------
 .../processors/clock/GridClockSource.java       |  30 --
 .../clock/GridClockSyncProcessor.java           | 458 -------------------
 .../processors/clock/GridJvmClockSource.java    |  28 --
 .../dataload/IgniteDataLoaderImpl.java          |   1 -
 .../dr/GridDrDataLoadCacheUpdater.java          |  77 ----
 .../grid/kernal/processors/dr/GridDrType.java   |  38 --
 .../processors/dr/GridRawVersionedEntry.java    | 210 ---------
 .../processors/dr/GridVersionedEntry.java       |  80 ----
 .../grid/kernal/processors/dr/package.html      |  25 -
 .../processors/interop/GridInteropAware.java    |  49 --
 .../interop/GridInteropProcessor.java           |  82 ----
 .../interop/GridInteropProcessorAdapter.java    |  31 --
 .../processors/interop/GridInteropTarget.java   | 109 -----
 .../interop/os/GridOsInteropProcessor.java      |  80 ----
 .../kernal/processors/interop/os/package.html   |  23 -
 .../grid/kernal/processors/interop/package.html |  23 -
 .../portable/GridPortableInputStream.java       | 177 -------
 .../portable/GridPortableOutputStream.java      | 172 -------
 .../portable/GridPortableProcessor.java         | 150 ------
 .../processors/portable/GridPortableStream.java |  53 ---
 .../portable/os/GridOsPortableProcessor.java    | 126 -----
 .../kernal/processors/portable/os/package.html  |  23 -
 .../kernal/processors/portable/package.html     |  23 -
 .../clock/GridTimeSyncProcessorSelfTest.java    | 223 +++++++++
 .../processors/cache/GridCacheTestEntryEx.java  |   2 +-
 .../GridCacheReplicatedInvalidateSelfTest.java  |   2 +-
 .../clock/GridTimeSyncProcessorSelfTest.java    | 223 ---------
 ...idHadoopDefaultMapReducePlannerSelfTest.java |   2 +-
 86 files changed, 3271 insertions(+), 3281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java
index eed3b86..6ee5e8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.interop.*;
+import org.apache.ignite.internal.processors.interop.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
index f3461a4..5c923bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
@@ -49,19 +49,18 @@ import org.apache.ignite.internal.managers.security.*;
 import org.apache.ignite.internal.managers.swapspace.*;
 import org.gridgain.grid.kernal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.clock.*;
+import org.apache.ignite.internal.processors.clock.*;
 import org.gridgain.grid.kernal.processors.closure.*;
 import org.gridgain.grid.kernal.processors.continuous.*;
 import org.gridgain.grid.kernal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
-import org.gridgain.grid.kernal.processors.interop.*;
+import org.apache.ignite.internal.processors.interop.*;
 import org.apache.ignite.internal.processors.job.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.license.*;
 import org.apache.ignite.internal.processors.offheap.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.port.*;
-import org.gridgain.grid.kernal.processors.portable.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.processors.rest.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index aa5b24c..132497e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -35,20 +35,19 @@ import org.apache.ignite.internal.managers.securesession.*;
 import org.apache.ignite.internal.managers.swapspace.*;
 import org.gridgain.grid.kernal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.clock.*;
+import org.apache.ignite.internal.processors.clock.*;
 import org.gridgain.grid.kernal.processors.closure.*;
 import org.gridgain.grid.kernal.processors.continuous.*;
 import org.gridgain.grid.kernal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.gridgain.grid.kernal.processors.interop.*;
+import org.apache.ignite.internal.processors.interop.*;
 import org.apache.ignite.internal.processors.job.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.license.*;
 import org.apache.ignite.internal.processors.offheap.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.port.*;
-import org.gridgain.grid.kernal.processors.portable.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.processors.rest.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index ea3ba95..26c69b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -37,20 +37,19 @@ import org.gridgain.grid.kernal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.dr.*;
 import org.gridgain.grid.kernal.processors.cache.dr.os.*;
-import org.gridgain.grid.kernal.processors.clock.*;
+import org.apache.ignite.internal.processors.clock.*;
 import org.gridgain.grid.kernal.processors.closure.*;
 import org.gridgain.grid.kernal.processors.continuous.*;
 import org.gridgain.grid.kernal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.gridgain.grid.kernal.processors.interop.*;
+import org.apache.ignite.internal.processors.interop.*;
 import org.apache.ignite.internal.processors.job.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.license.*;
 import org.apache.ignite.internal.processors.offheap.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.port.*;
-import org.gridgain.grid.kernal.processors.portable.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.processors.rest.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java
index c67553b..e3b6c52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.portables.*;
-import org.gridgain.grid.kernal.processors.portable.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
new file mode 100644
index 0000000..7e135f5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.util.*;
+
+/**
+ * Snapshot of time deltas for given topology.
+ */
+public class GridClockDeltaSnapshot {
+    /** Time delta version. */
+    private final GridClockDeltaVersion ver;
+
+    /** Deltas between coordinator and nodes by node ID. */
+    private final Map<UUID, Long> deltas;
+
+    /** Pending delta values. */
+    @GridToStringExclude
+    private final Map<UUID, DeltaAverage> pendingDeltas;
+
+    /**
+     * @param ver Snapshot version.
+     * @param locNodeId Local node ID.
+     * @param discoSnap Discovery snapshot.
+     * @param avgSize Average size.
+     */
+    public GridClockDeltaSnapshot(
+        GridClockDeltaVersion ver,
+        UUID locNodeId,
+        GridDiscoveryTopologySnapshot discoSnap,
+        int avgSize
+    ) {
+        assert ver.topologyVersion() == discoSnap.topologyVersion();
+
+        this.ver = ver;
+
+        deltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f);
+
+        pendingDeltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f);
+
+        for (ClusterNode n : discoSnap.topologyNodes()) {
+            if (!locNodeId.equals(n.id()))
+                pendingDeltas.put(n.id(), new DeltaAverage(avgSize));
+        }
+    }
+
+    /**
+     * @param ver Snapshot version.
+     * @param deltas Deltas map.
+     */
+    public GridClockDeltaSnapshot(GridClockDeltaVersion ver, Map<UUID, Long> 
deltas) {
+        this.ver = ver;
+        this.deltas = deltas;
+
+        pendingDeltas = Collections.emptyMap();
+    }
+
+    /**
+     * @return Version.
+     */
+    public GridClockDeltaVersion version() {
+        return ver;
+    }
+
+    /**
+     * @return Map of collected deltas.
+     */
+    public Map<UUID, Long> deltas() {
+        return Collections.unmodifiableMap(deltas);
+    }
+
+    /**
+     * Awaits either until snapshot is ready or timeout elapses.
+     *
+     * @param timeout Timeout to wait.
+     * @throws IgniteInterruptedException If wait was interrupted.
+     */
+    public synchronized void awaitReady(long timeout) throws 
IgniteInterruptedException {
+        long start = System.currentTimeMillis();
+
+        try {
+            while (!ready()) {
+                long now = System.currentTimeMillis();
+
+                if (start + timeout - now <= 0)
+                    return;
+
+                wait(start + timeout - now);
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    /**
+     * Callback invoked when time delta is received from remote node.
+     *
+     * @param nodeId Node ID.
+     * @param timeDelta Calculated time delta.
+     * @return {@code True} if more samples needed from that node.
+     */
+    public synchronized boolean onDeltaReceived(UUID nodeId, long timeDelta) {
+        DeltaAverage avg = pendingDeltas.get(nodeId);
+
+        if (avg != null) {
+            avg.onValue(timeDelta);
+
+            if (avg.ready()) {
+                pendingDeltas.remove(nodeId);
+
+                deltas.put(nodeId, avg.average());
+
+                if (ready())
+                    notifyAll();
+
+                return false;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Callback invoked when node left.
+     *
+     * @param nodeId Left node ID.
+     */
+    public synchronized void onNodeLeft(UUID nodeId) {
+        pendingDeltas.remove(nodeId);
+
+        deltas.put(nodeId, 0L);
+
+        if (ready())
+            notifyAll();
+    }
+
+    /**
+     * @return {@code True} if snapshot is ready.
+     */
+    public synchronized boolean ready() {
+        return pendingDeltas.isEmpty();
+    }
+
+    /**
+     * @return Collection of node IDs for which response was not received so 
far.
+     */
+    public synchronized Collection<UUID> pendingNodeIds() {
+        // Must return copy.
+        return new HashSet<>(pendingDeltas.keySet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridClockDeltaSnapshot.class, this);
+    }
+
+    /**
+     * Delta average.
+     */
+    private static class DeltaAverage {
+        /** Delta values. */
+        private long[] vals;
+
+        /** Current index. */
+        private int idx;
+
+        /**
+         * @param size Accumulator size.
+         */
+        private DeltaAverage(int size) {
+            vals = new long[size];
+        }
+
+        /**
+         * Adds value to accumulator.
+         *
+         * @param val Value to add.
+         */
+        public void onValue(long val) {
+            if (idx < vals.length)
+                vals[idx++] = val;
+        }
+
+        /**
+         * Whether this average is complete.
+         *
+         * @return {@code True} if enough values is collected.
+         */
+        public boolean ready() {
+            return idx == vals.length;
+        }
+
+        /**
+         * @return Average delta.
+         */
+        public long average() {
+            long sum = 0;
+
+            for (long val : vals)
+                sum += val;
+
+            return sum / vals.length;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
new file mode 100644
index 0000000..3164c46
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Message containing time delta map for all nodes.
+ */
+public class GridClockDeltaSnapshotMessage extends 
GridTcpCommunicationMessageAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Snapshot version. */
+    private GridClockDeltaVersion snapVer;
+
+    /** Grid time deltas. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = long.class)
+    private Map<UUID, Long> deltas;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridClockDeltaSnapshotMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param snapVer Snapshot version.
+     * @param deltas Deltas map.
+     */
+    public GridClockDeltaSnapshotMessage(GridClockDeltaVersion snapVer, 
Map<UUID, Long> deltas) {
+        this.snapVer = snapVer;
+        this.deltas = deltas;
+    }
+
+    /**
+     * @return Snapshot version.
+     */
+    public GridClockDeltaVersion snapshotVersion() {
+        return snapVer;
+    }
+
+    /**
+     * @return Time deltas map.
+     */
+    public Map<UUID, Long> deltas() {
+        return deltas;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridClockDeltaSnapshotMessage _clone = new 
GridClockDeltaSnapshotMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        GridClockDeltaSnapshotMessage _clone = 
(GridClockDeltaSnapshotMessage)_msg;
+
+        _clone.snapVer = snapVer;
+        _clone.deltas = deltas;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (deltas != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(deltas.size()))
+                            return false;
+
+                        commState.it = deltas.entrySet().iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        Map.Entry<UUID, Long> e = (Map.Entry<UUID, 
Long>)commState.cur;
+
+                        if (!commState.keyDone) {
+                            if (!commState.putUuid(e.getKey()))
+                                return false;
+
+                            commState.keyDone = true;
+                        }
+
+                        if (!commState.putLong(e.getValue()))
+                            return false;
+
+                        commState.keyDone = false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 1:
+                if (!commState.putClockDeltaVersion(snapVer))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        switch (commState.idx) {
+            case 0:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (deltas == null)
+                        deltas = U.newHashMap(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        if (!commState.keyDone) {
+                            UUID _val = commState.getUuid();
+
+                            if (_val == UUID_NOT_READ)
+                                return false;
+
+                            commState.cur = _val;
+                            commState.keyDone = true;
+                        }
+
+                        if (buf.remaining() < 8)
+                            return false;
+
+                        long _val = commState.getLong();
+
+                        deltas.put((UUID)commState.cur, _val);
+
+                        commState.keyDone = false;
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+                commState.cur = null;
+
+                commState.idx++;
+
+            case 1:
+                GridClockDeltaVersion snapVer0 = 
commState.getClockDeltaVersion();
+
+                if (snapVer0 == CLOCK_DELTA_VER_NOT_READ)
+                    return false;
+
+                snapVer = snapVer0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 59;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridClockDeltaSnapshotMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
new file mode 100644
index 0000000..aa03c88
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Version for time delta snapshot.
+ */
+public class GridClockDeltaVersion implements 
Comparable<GridClockDeltaVersion>, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Snapshot local version. */
+    private long ver;
+
+    /** Topology version. */
+    private long topVer;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridClockDeltaVersion() {
+        // No-op.
+    }
+
+    /**
+     * @param ver Version.
+     * @param topVer Topology version.
+     */
+    public GridClockDeltaVersion(long ver, long topVer) {
+        this.ver = ver;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Snapshot local version.
+     */
+    public long version() {
+        return ver;
+    }
+
+    /**
+     * @return Snapshot topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(GridClockDeltaVersion o) {
+        if (topVer == o.topVer) {
+            if (ver == o.ver)
+                return 0;
+
+            return ver > o.ver ? 1 : -1;
+        }
+
+        return topVer > o.topVer ? 1 : -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof GridClockDeltaVersion))
+            return false;
+
+        GridClockDeltaVersion that = (GridClockDeltaVersion)o;
+
+        return topVer == that.topVer && ver == that.ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int)(ver ^ (ver >>> 32));
+
+        res = 31 * res + (int)(topVer ^ (topVer >>> 32));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(ver);
+        out.writeLong(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        ver = in.readLong();
+        topVer = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridClockDeltaVersion.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
new file mode 100644
index 0000000..90b8000
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Time server message.
+ */
+public class GridClockMessage {
+    /** Packet size. */
+    public static final int PACKET_SIZE = 48;
+
+    /** Originating node ID. */
+    private UUID origNodeId;
+
+    /** Target node ID. */
+    private UUID targetNodeId;
+
+    /** Originating timestamp. */
+    private long origTs;
+
+    /** Remote node reply ts. */
+    private long replyTs;
+
+    /**
+     * @param origNodeId Originating node ID.
+     * @param targetNodeId Target node ID.
+     * @param origTs Originating timestamp.
+     * @param replyTs Reply timestamp.
+     */
+    public GridClockMessage(UUID origNodeId, UUID targetNodeId, long origTs, 
long replyTs) {
+        this.origNodeId = origNodeId;
+        this.targetNodeId = targetNodeId;
+        this.origTs = origTs;
+        this.replyTs = replyTs;
+    }
+
+    /**
+     * @return Originating node ID.
+     */
+    public UUID originatingNodeId() {
+        return origNodeId;
+    }
+
+    /**
+     * @param origNodeId Originating node ID.
+     */
+    public void originatingNodeId(UUID origNodeId) {
+        this.origNodeId = origNodeId;
+    }
+
+    /**
+     * @return Target node ID.
+     */
+    public UUID targetNodeId() {
+        return targetNodeId;
+    }
+
+    /**
+     * @param targetNodeId Target node ID.
+     */
+    public void targetNodeId(UUID targetNodeId) {
+        this.targetNodeId = targetNodeId;
+    }
+
+    /**
+     * @return Originating timestamp.
+     */
+    public long originatingTimestamp() {
+        return origTs;
+    }
+
+    /**
+     * @param origTs Originating timestamp.
+     */
+    public void originatingTimestamp(long origTs) {
+        this.origTs = origTs;
+    }
+
+    /**
+     * @return Reply timestamp.
+     */
+    public long replyTimestamp() {
+        return replyTs;
+    }
+
+    /**
+     * @param replyTs Reply timestamp.
+     */
+    public void replyTimestamp(long replyTs) {
+        this.replyTs = replyTs;
+    }
+
+    /**
+     * Converts message to bytes to send over network.
+     *
+     * @return Bytes representing this packet.
+     */
+    public byte[] toBytes() {
+        byte[] buf = new byte[PACKET_SIZE];
+
+        int off = 0;
+
+        off = U.longToBytes(origNodeId.getLeastSignificantBits(), buf, off);
+        off = U.longToBytes(origNodeId.getMostSignificantBits(), buf, off);
+
+        off = U.longToBytes(targetNodeId.getLeastSignificantBits(), buf, off);
+        off = U.longToBytes(targetNodeId.getMostSignificantBits(), buf, off);
+
+        off = U.longToBytes(origTs, buf, off);
+
+        off = U.longToBytes(replyTs, buf, off);
+
+        assert off == PACKET_SIZE;
+
+        return buf;
+    }
+
+    /**
+     * Constructs message from bytes.
+     *
+     * @param buf Bytes.
+     * @param off Offset.
+     * @param len Packet length.
+     * @return Assembled message.
+     * @throws IgniteCheckedException If message length is invalid.
+     */
+    public static GridClockMessage fromBytes(byte[] buf, int off, int len) 
throws IgniteCheckedException {
+        if (len < PACKET_SIZE)
+            throw new IgniteCheckedException("Failed to assemble time server 
packet (message is too short).");
+
+        long lsb = U.bytesToLong(buf, off);
+        long msb = U.bytesToLong(buf, off + 8);
+
+        UUID origNodeId = new UUID(msb, lsb);
+
+        lsb = U.bytesToLong(buf, off + 16);
+        msb = U.bytesToLong(buf, off + 24);
+
+        UUID targetNodeId = new UUID(msb, lsb);
+
+        long origTs = U.bytesToLong(buf, off + 32);
+        long replyTs = U.bytesToLong(buf, off + 40);
+
+        return new GridClockMessage(origNodeId, targetNodeId, origTs, replyTs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridClockMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
new file mode 100644
index 0000000..f188181
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.thread.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Time server that enables time synchronization between nodes.
+ */
+public class GridClockServer {
+    /** Kernal context. */
+    private GridKernalContext ctx;
+
+    /** Datagram socket for message exchange. */
+    private DatagramSocket sock;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Read worker. */
+    private GridWorker readWorker;
+
+    /** Instance of time processor. */
+    private GridClockSyncProcessor clockSync;
+
+    /**
+     * Starts server.
+     *
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException If server could not be started.
+     */
+    public void start(GridKernalContext ctx) throws IgniteCheckedException {
+        this.ctx = ctx;
+
+        clockSync = ctx.clockSync();
+        log = ctx.log(GridClockServer.class);
+
+        try {
+            int startPort = ctx.config().getTimeServerPortBase();
+            int endPort = startPort + ctx.config().getTimeServerPortRange() - 
1;
+
+            InetAddress locHost = !F.isEmpty(ctx.config().getLocalHost()) ?
+                InetAddress.getByName(ctx.config().getLocalHost()) :
+                U.getLocalHost();
+
+            for (int p = startPort; p <= endPort; p++) {
+                try {
+                    sock = new DatagramSocket(p, locHost);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Successfully bound time server [host=" + 
locHost + ", port=" + p + ']');
+
+                    break;
+                }
+                catch (SocketException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to bind time server socket [host=" + 
locHost + ", port=" + p +
+                            ", err=" + e.getMessage() + ']');
+                }
+            }
+
+            if (sock == null)
+                throw new IgniteCheckedException("Failed to bind time server 
socket within specified port range [locHost=" +
+                    locHost + ", startPort=" + startPort + ", endPort=" + 
endPort + ']');
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to start time server 
(failed to get local host address)", e);
+        }
+    }
+
+    /**
+     * After start callback.
+     */
+    public void afterStart() {
+        readWorker = new ReadWorker();
+
+        IgniteThread th = new IgniteThread(readWorker);
+
+        th.setPriority(Thread.MAX_PRIORITY);
+
+        th.start();
+    }
+
+    /**
+     * Stops server.
+     */
+    public void stop() {
+        // No-op.
+    }
+
+    /**
+     * Before stop callback.
+     */
+    public void beforeStop() {
+        if (readWorker != null)
+            readWorker.cancel();
+
+        U.closeQuiet(sock);
+
+        if (readWorker != null)
+            U.join(readWorker, log);
+    }
+
+    /**
+     * Sends packet to remote node.
+     *
+     * @param msg Message to send.
+     * @param addr Address.
+     * @param port Port.
+     * @throws IgniteCheckedException If send failed.
+     */
+    public void sendPacket(GridClockMessage msg, InetAddress addr, int port) 
throws IgniteCheckedException {
+        try {
+            DatagramPacket packet = new DatagramPacket(msg.toBytes(), 
GridClockMessage.PACKET_SIZE, addr, port);
+
+            if (log.isDebugEnabled())
+                log.debug("Sending time sync packet [msg=" + msg + ", addr=" + 
addr + ", port=" + port);
+
+            sock.send(packet);
+        }
+        catch (IOException e) {
+            if (!sock.isClosed())
+                throw new IgniteCheckedException("Failed to send datagram 
message to remote node [addr=" + addr +
+                    ", port=" + port + ", msg=" + msg + ']', e);
+        }
+    }
+
+    /**
+     * @return Address to which this server is bound.
+     */
+    public InetAddress host() {
+        return sock.getLocalAddress();
+    }
+
+    /**
+     * @return Port to which this server is bound.
+     */
+    public int port() {
+        return sock.getLocalPort();
+    }
+
+    /**
+     * Message read worker.
+     */
+    private class ReadWorker extends GridWorker {
+        /**
+         * Creates read worker.
+         */
+        protected ReadWorker() {
+            super(ctx.gridName(), "grid-time-server-reader", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
+            DatagramPacket packet = new DatagramPacket(new 
byte[GridClockMessage.PACKET_SIZE],
+                GridClockMessage.PACKET_SIZE);
+
+            while (!isCancelled()) {
+                try {
+                    // Read packet from buffer.
+                    sock.receive(packet);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Received clock sync message from remote 
node [host=" + packet.getAddress() +
+                            ", port=" + packet.getPort() + ']');
+
+                    GridClockMessage msg = 
GridClockMessage.fromBytes(packet.getData(), packet.getOffset(),
+                        packet.getLength());
+
+                    clockSync.onMessageReceived(msg, packet.getAddress(), 
packet.getPort());
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to assemble clock server message (will 
ignore the packet) [host=" +
+                        packet.getAddress() + ", port=" + packet.getPort() + 
", err=" + e.getMessage() + ']');
+                }
+                catch (IOException e) {
+                    if (!isCancelled())
+                        U.warn(log, "Failed to receive message on datagram 
socket: " + e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
new file mode 100644
index 0000000..65f746a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+/**
+ * Interface representing time source for time processor.
+ */
+public interface GridClockSource {
+    /**
+     * Gets current time in milliseconds past since 1 January, 1970.
+     *
+     * @return Current time in milliseconds.
+     */
+    public long currentTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
new file mode 100644
index 0000000..d1c9931
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.thread.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.GridNodeAttributes.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Time synchronization processor.
+ */
+public class GridClockSyncProcessor extends GridProcessorAdapter {
+    /** Maximum size for time sync history. */
+    private static final int MAX_TIME_SYNC_HISTORY = 100;
+
+    /** Time server instance. */
+    private GridClockServer srv;
+
+    /** Shutdown lock. */
+    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+
+    /** Stopping flag. */
+    private volatile boolean stopping;
+
+    /** Time coordinator thread. */
+    private volatile TimeCoordinator timeCoord;
+
+    /** Time delta history. Constructed on coorinator. */
+    private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
timeSyncHist =
+        new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
+
+    /** Time source. */
+    private GridClockSource clockSrc;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridClockSyncProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        super.start();
+
+        clockSrc = ctx.timeSource();
+
+        srv = new GridClockServer();
+
+        srv.start(ctx);
+
+        ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() 
{
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridClockDeltaSnapshotMessage;
+
+                GridClockDeltaSnapshotMessage msg0 = 
(GridClockDeltaSnapshotMessage)msg;
+
+                GridClockDeltaVersion ver = msg0.snapshotVersion();
+
+                timeSyncHist.put(ver, new GridClockDeltaSnapshot(ver, 
msg0.deltas()));
+            }
+        });
+
+        // We care only about node leave and fail events.
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(IgniteEvent evt) {
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED;
+
+                IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+                if (evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED)
+                    checkLaunchCoordinator(discoEvt);
+
+                TimeCoordinator timeCoord0 = timeCoord;
+
+                if (timeCoord0 != null)
+                    timeCoord0.onDiscoveryEvent(discoEvt);
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addAttributes(Map<String, Object> attrs) throws 
IgniteCheckedException {
+        super.addAttributes(attrs);
+
+        attrs.put(ATTR_TIME_SERVER_HOST, srv.host());
+        attrs.put(ATTR_TIME_SERVER_PORT, srv.port());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        srv.afterStart();
+
+        // Check at startup if this node is a fragmentizer coordinator.
+        IgniteDiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent();
+
+        checkLaunchCoordinator(locJoinEvt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        rw.writeLock();
+
+        try {
+            stopping = false;
+
+            if (timeCoord != null) {
+                timeCoord.cancel();
+
+                U.join(timeCoord, log);
+
+                timeCoord = null;
+            }
+
+            if (srv != null)
+                srv.beforeStop();
+        }
+        finally {
+            rw.writeUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+
+        if (srv != null)
+            srv.stop();
+    }
+
+    /**
+     * Gets current time on local node.
+     *
+     * @return Current time in milliseconds.
+     */
+    private long currentTime() {
+        return clockSrc.currentTimeMillis();
+    }
+
+    /**
+     * @return Time sync history.
+     */
+    public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> 
timeSyncHistory() {
+        return timeSyncHist;
+    }
+
+    /**
+     * Callback from server for message receiving.
+     *
+     * @param msg Received message.
+     * @param addr Remote node address.
+     * @param port Remote node port.
+     */
+    public void onMessageReceived(GridClockMessage msg, InetAddress addr, int 
port) {
+        long rcvTs = currentTime();
+
+        if (!msg.originatingNodeId().equals(ctx.localNodeId())) {
+            // We received time request from remote node, set current time and 
reply back.
+            msg.replyTimestamp(rcvTs);
+
+            try {
+                srv.sendPacket(msg, addr, port);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send time server reply to remote node: 
" + msg, e);
+            }
+        }
+        else
+            timeCoord.onMessage(msg, rcvTs);
+    }
+
+    /**
+     * Checks if local node is the oldest node in topology and starts time 
coordinator if so.
+     *
+     * @param discoEvt Discovery event.
+     */
+    private void checkLaunchCoordinator(IgniteDiscoveryEvent discoEvt) {
+        rw.readLock();
+
+        try {
+            if (stopping)
+                return;
+
+            if (timeCoord == null) {
+                long minNodeOrder = Long.MAX_VALUE;
+
+                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
+
+                for (ClusterNode node : nodes) {
+                    if (node.order() < minNodeOrder)
+                        minNodeOrder = node.order();
+                }
+
+                ClusterNode locNode = ctx.grid().localNode();
+
+                if (locNode.order() == minNodeOrder) {
+                    if (log.isDebugEnabled())
+                        log.debug("Detected local node to be the eldest node 
in topology, starting time " +
+                            "coordinator thread [discoEvt=" + discoEvt + ", 
locNode=" + locNode + ']');
+
+                    synchronized (this) {
+                        if (timeCoord == null && !stopping) {
+                            timeCoord = new TimeCoordinator(discoEvt);
+
+                            IgniteThread th = new IgniteThread(timeCoord);
+
+                            th.setPriority(Thread.MAX_PRIORITY);
+
+                            th.start();
+                        }
+                    }
+                }
+            }
+        }
+        finally {
+            rw.readUnlock();
+        }
+    }
+
+    /**
+     * Gets time adjusted with time coordinator on given topology version.
+     *
+     * @param topVer Topology version.
+     * @return Adjusted time.
+     */
+    public long adjustedTime(long topVer) {
+        // Get last synchronized time on given topology version.
+        Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = 
timeSyncHistory().lowerEntry(
+            new GridClockDeltaVersion(0, topVer + 1));
+
+        GridClockDeltaSnapshot snap = entry == null ? null : entry.getValue();
+
+        long now = clockSrc.currentTimeMillis();
+
+        if (snap == null)
+            return System.currentTimeMillis();
+
+        Long delta = snap.deltas().get(ctx.localNodeId());
+
+        if (delta == null)
+            delta = 0L;
+
+        return now + delta;
+    }
+
+    /**
+     * Publishes snapshot to topology.
+     *
+     * @param snapshot Snapshot to publish.
+     * @param top Topology to send given snapshot to.
+     */
+    private void publish(GridClockDeltaSnapshot snapshot, 
GridDiscoveryTopologySnapshot top) {
+        if (!rw.tryReadLock())
+            return;
+
+        try {
+            timeSyncHist.put(snapshot.version(), snapshot);
+
+            for (ClusterNode n : top.topologyNodes()) {
+                GridClockDeltaSnapshotMessage msg = new 
GridClockDeltaSnapshotMessage(
+                    snapshot.version(), snapshot.deltas());
+
+                try {
+                    ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    if (ctx.discovery().pingNode(n.id()))
+                        U.error(log, "Failed to send time sync snapshot to 
remote node (did not leave grid?) " +
+                            "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + 
e.getMessage() + ']');
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to send time sync snapshot to remote 
node (did not leave grid?) " +
+                            "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + 
e.getMessage() + ']');
+                }
+            }
+        }
+        finally {
+            rw.readUnlock();
+        }
+    }
+
+    /**
+     * Time coordinator thread.
+     */
+    private class TimeCoordinator extends GridWorker {
+        /** Last discovery topology snapshot. */
+        private volatile GridDiscoveryTopologySnapshot lastSnapshot;
+
+        /** Snapshot being constructed. May be not null only on coordinator 
node. */
+        private volatile GridClockDeltaSnapshot pendingSnapshot;
+
+        /** Version counter. */
+        private long verCnt = 1;
+
+        /**
+         * Time coordinator thread constructor.
+         *
+         * @param evt Discovery event on which this node became a coordinator.
+         */
+        protected TimeCoordinator(IgniteDiscoveryEvent evt) {
+            super(ctx.gridName(), "grid-time-coordinator", log);
+
+            lastSnapshot = new 
GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
+            while (!isCancelled()) {
+                GridDiscoveryTopologySnapshot top = lastSnapshot;
+
+                if (log.isDebugEnabled())
+                    log.debug("Creating time sync snapshot for topology: " + 
top);
+
+                GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot(
+                    new GridClockDeltaVersion(verCnt++, top.topologyVersion()),
+                    ctx.localNodeId(),
+                    top,
+                    ctx.config().getClockSyncSamples());
+
+                pendingSnapshot = snapshot;
+
+                while (!snapshot.ready()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Requesting time from remote nodes: " + 
snapshot.pendingNodeIds());
+
+                    for (UUID nodeId : snapshot.pendingNodeIds())
+                        requestTime(nodeId);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Waiting for snapshot to be ready: " + 
snapshot);
+
+                    // Wait for all replies
+                    snapshot.awaitReady(1000);
+                }
+
+                // No more messages should be processed.
+                pendingSnapshot = null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Collected time sync results: " + 
snapshot.deltas());
+
+                publish(snapshot, top);
+
+                synchronized (this) {
+                    if (top.topologyVersion() == 
lastSnapshot.topologyVersion())
+                        wait(ctx.config().getClockSyncFrequency());
+                }
+            }
+        }
+
+        /**
+         * @param evt Discovery event.
+         */
+        public void onDiscoveryEvent(IgniteDiscoveryEvent evt) {
+            if (log.isDebugEnabled())
+                log.debug("Processing discovery event: " + evt);
+
+            if (evt.type() == IgniteEventType.EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT)
+                onNodeLeft(evt.eventNode().id());
+
+            synchronized (this) {
+                lastSnapshot = new 
GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * @param msg Message received from remote node.
+         * @param rcvTs Receive timestamp.
+         */
+        private void onMessage(GridClockMessage msg, long rcvTs) {
+            GridClockDeltaSnapshot curr = pendingSnapshot;
+
+            if (curr != null) {
+                long delta = (msg.originatingTimestamp() + rcvTs) / 2 - 
msg.replyTimestamp();
+
+                boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), 
delta);
+
+                if (needMore)
+                    requestTime(msg.targetNodeId());
+            }
+        }
+
+        /**
+         * Requests time from remote node.
+         *
+         * @param rmtNodeId Remote node ID.
+         */
+        private void requestTime(UUID rmtNodeId) {
+            ClusterNode node = ctx.discovery().node(rmtNodeId);
+
+            if (node != null) {
+                InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST);
+                int port = node.attribute(ATTR_TIME_SERVER_PORT);
+
+                try {
+                    GridClockMessage req = new 
GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0);
+
+                    srv.sendPacket(req, addr, port);
+                }
+                catch (IgniteCheckedException e) {
+                    LT.warn(log, e, "Failed to send time request to remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", addr=" + addr + ", port=" + port + ']');
+                }
+            }
+            else
+                onNodeLeft(rmtNodeId);
+        }
+
+        /**
+         * Node left callback.
+         *
+         * @param nodeId Left node ID.
+         */
+        private void onNodeLeft(UUID nodeId) {
+            GridClockDeltaSnapshot curr = pendingSnapshot;
+
+            if (curr != null)
+                curr.onNodeLeft(nodeId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
new file mode 100644
index 0000000..e10d816
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.clock;
+
+/**
+ * JVM time source.
+ */
+public class GridJvmClockSource implements GridClockSource {
+    /** {@inheritDoc} */
+    @Override public long currentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
new file mode 100644
index 0000000..f874e4b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dr;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.dataload.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.dr.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.*;
+
+/**
+ * Data center replication cache updater for data loader.
+ */
+public class GridDrDataLoadCacheUpdater<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public void update(IgniteCache<K, V> cache0, 
Collection<Map.Entry<K, V>> col)
+        throws IgniteCheckedException {
+        String cacheName = 
cache0.getConfiguration(CacheConfiguration.class).getName();
+
+        GridKernalContext ctx = 
((GridKernal)cache0.unwrap(Ignite.class)).context();
+        IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class);
+        GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
+
+        assert !F.isEmpty(col);
+
+        if (log.isDebugEnabled())
+            log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", 
cacheName=" + cacheName + ']');
+
+        IgniteFuture<?> f = cache.context().preloader().startFuture();
+
+        if (!f.isDone())
+            f.get();
+
+        for (Map.Entry<K, V> entry0 : col) {
+            GridVersionedEntry<K, V> entry = (GridVersionedEntry<K, V>)entry0;
+
+            entry.unmarshal(ctx.config().getMarshaller());
+
+            K key = entry.key();
+
+            GridCacheDrInfo<V> val = entry.value() != null ? 
entry.expireTime() != 0 ?
+                new GridCacheDrExpirationInfo<>(entry.value(), 
entry.version(), entry.ttl(), entry.expireTime()) :
+                new GridCacheDrInfo<>(entry.value(), entry.version()) : null;
+
+            if (val == null)
+                cache.removeAllDr(Collections.singletonMap(key, 
entry.version()));
+            else
+                cache.putAllDr(Collections.singletonMap(key, val));
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", 
cacheName=" + cacheName + ']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java
new file mode 100644
index 0000000..ac1c19c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dr;
+
+/**
+ * Data center replication type.
+ */
+public enum GridDrType {
+    /** Do not replicate that entry. */
+    DR_NONE,
+
+    /** Regular replication on primary node. */
+    DR_PRIMARY,
+
+    /** Regular replication on backup node. */
+    DR_BACKUP,
+
+    /** Replication during load. */
+    DR_LOAD,
+
+    /** Replication during preload. */
+    DR_PRELOAD
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java
new file mode 100644
index 0000000..eff6d1c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dr;
+
+import org.apache.ignite.*;
+import org.apache.ignite.marshaller.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public class GridRawVersionedEntry<K, V> implements GridVersionedEntry<K, V>, 
Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Key. */
+    private K key;
+
+    /** Key bytes. */
+    private byte[] keyBytes;
+
+    /** Value. */
+    private V val;
+
+    /** Value bytes. */
+    private byte[] valBytes;
+
+    /** TTL. */
+    private long ttl;
+
+    /** Expire time. */
+    private long expireTime;
+
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /**
+     * {@code Externalizable) support.
+     */
+    public GridRawVersionedEntry() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param key Key.
+     * @param keyBytes Key bytes.
+     * @param val Value.
+     * @param valBytes Value bytes.
+     * @param expireTime Expire time.
+     * @param ttl TTL.
+     * @param ver Version.
+     */
+    public GridRawVersionedEntry(K key,
+        @Nullable byte[] keyBytes,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        long ttl,
+        long expireTime,
+        GridCacheVersion ver) {
+        this.key = key;
+        this.keyBytes = keyBytes;
+        this.val = val;
+        this.valBytes = valBytes;
+        this.ttl = ttl;
+        this.expireTime = expireTime;
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K key() {
+        assert key != null : "Entry is being improperly processed.";
+
+        return key;
+    }
+
+    /**
+     * @return Key bytes.
+     */
+    public byte[] keyBytes() {
+        return keyBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V value() {
+        return val;
+    }
+
+    /**
+     * @return Value bytes.
+     */
+    public byte[] valueBytes() {
+        return valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ttl() {
+        return ttl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long expireTime() {
+        return expireTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unmarshal(IgniteMarshaller marsh) throws 
IgniteCheckedException {
+        unmarshalKey(marsh);
+
+        if (valBytes != null && val == null)
+            val = marsh.unmarshal(valBytes, null);
+    }
+
+    /**
+     * Perform internal key unmarshal of this entry. It must be performed 
after entry is deserialized and before
+     * its restored key/value are needed.
+     *
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void unmarshalKey(IgniteMarshaller marsh) throws 
IgniteCheckedException {
+        if (key == null)
+            key = marsh.unmarshal(keyBytes, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void marshal(IgniteMarshaller marsh) throws 
IgniteCheckedException {
+        if (keyBytes == null)
+            keyBytes = marsh.marshal(key);
+
+        if (valBytes == null && val != null)
+            valBytes = marsh.marshal(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert keyBytes != null;
+
+        U.writeByteArray(out, keyBytes);
+        U.writeByteArray(out, valBytes);
+
+        out.writeLong(ttl);
+
+        if (ttl != 0)
+            out.writeLong(expireTime);
+
+        out.writeObject(ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        keyBytes = U.readByteArray(in);
+        valBytes = U.readByteArray(in);
+
+        ttl = in.readLong();
+
+        if (ttl != 0)
+            expireTime = in.readLong();
+
+        ver = (GridCacheVersion)in.readObject();
+
+        assert keyBytes != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return key();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getValue() {
+        return value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V setValue(V val) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridRawVersionedEntry.class, this, "keyBytesLen", 
keyBytes != null ? keyBytes.length : "n/a",
+            "valBytesLen", valBytes != null ? valBytes.length : "n/a");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java
new file mode 100644
index 0000000..e05e8f6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dr;
+
+import org.apache.ignite.*;
+import org.apache.ignite.marshaller.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public interface GridVersionedEntry<K, V> extends Map.Entry<K, V> {
+    /**
+     * Gets entry's key.
+     *
+     * @return Entry's key.
+     */
+    public K key();
+
+    /**
+     * Gets entry's value.
+     *
+     * @return Entry's value.
+     */
+    @Nullable public V value();
+
+    /**
+     * Gets entry's TTL.
+     *
+     * @return Entry's TTL.
+     */
+    public long ttl();
+
+    /**
+     * Gets entry's expire time.
+     *
+     * @return Entry's expire time.
+     */
+    public long expireTime();
+
+    /**
+     * @return Version.
+     */
+    public GridCacheVersion version();
+
+    /**
+     * Perform internal marshal of this entry before it will be serialized.
+     *
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void marshal(IgniteMarshaller marsh) throws IgniteCheckedException;
+
+    /**
+     * Perform internal unmarshal of this entry. It must be performed after 
entry is deserialized and before
+     * its restored key/value are needed.
+     *
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void unmarshal(IgniteMarshaller marsh) throws 
IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html
new file mode 100644
index 0000000..55ce324
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html
@@ -0,0 +1,25 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <img alt="icon" class="javadocimg" src="{@docRoot}/img/cube.gif"/>
+    Data center replication processor.
+    <p>
+    @html.java.package
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java
new file mode 100644
index 0000000..81035d8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.interop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+
+/**
+ * Interface for interop-aware components.
+ */
+public interface GridInteropAware {
+    /**
+     * Sets configuration parameters.
+     *
+     * @param params Configuration parameters.
+     */
+    public void configure(Object... params);
+
+    /**
+     * Initializes interop-aware component.
+     *
+     * @param ctx Context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void initialize(GridKernalContext ctx) throws 
IgniteCheckedException;
+
+    /**
+     * Destroys interop-aware component.
+     *
+     * @param ctx Context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void destroy(GridKernalContext ctx) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java
new file mode 100644
index 0000000..33c841a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.interop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Interop processor.
+ */
+public interface GridInteropProcessor extends GridProcessor {
+    /**
+     * Release start latch.
+     */
+    public void releaseStart();
+
+    /**
+     * Await start on native side.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void awaitStart() throws IgniteCheckedException;
+
+    /**
+     * @return Environment pointer.
+     */
+    public long environmentPointer() throws IgniteCheckedException;
+
+    /**
+     * @return Grid name.
+     */
+    public String gridName();
+
+    /**
+     * Gets native wrapper for default Grid projection.
+     *
+     * @return Native compute wrapper.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridInteropTarget projection() throws IgniteCheckedException;
+
+    /**
+     * Gets native wrapper for cache with the given name.
+     *
+     * @param name Cache name ({@code null} for default cache).
+     * @return Native cache wrapper.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridInteropTarget cache(@Nullable String name) throws 
IgniteCheckedException;
+
+    /**
+     * Gets native wrapper for data loader for cache with the given name.
+     *
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @return Native data loader wrapper.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridInteropTarget dataLoader(@Nullable String cacheName) throws 
IgniteCheckedException;
+
+    /**
+     * Stops grid.
+     *
+     * @param cancel Cancel flag.
+     */
+    public void close(boolean cancel);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java
new file mode 100644
index 0000000..91ea27e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.interop;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+
+/**
+ * Interop processor adapter.
+ */
+public abstract class GridInteropProcessorAdapter extends GridProcessorAdapter 
implements GridInteropProcessor {
+    /** {@inheritDoc} */
+    protected GridInteropProcessorAdapter(GridKernalContext ctx) {
+        super(ctx);
+    }
+}

Reply via email to