# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c602549a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c602549a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c602549a Branch: refs/heads/ignite-63 Commit: c602549aceca0fedc33abfd12cd40fc552f097d2 Parents: cfcf46d Author: sboikov <[email protected]> Authored: Fri Jan 23 00:05:03 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Jan 23 00:05:07 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/internal/GridEx.java | 2 +- .../org/apache/ignite/internal/GridKernal.java | 5 +- .../ignite/internal/GridKernalContext.java | 5 +- .../ignite/internal/GridKernalContextImpl.java | 5 +- .../ignite/internal/GridPortablesImpl.java | 2 - .../clock/GridClockDeltaSnapshot.java | 231 ++++++++++ .../clock/GridClockDeltaSnapshotMessage.java | 226 +++++++++ .../processors/clock/GridClockDeltaVersion.java | 117 +++++ .../processors/clock/GridClockMessage.java | 171 +++++++ .../processors/clock/GridClockServer.java | 206 +++++++++ .../processors/clock/GridClockSource.java | 30 ++ .../clock/GridClockSyncProcessor.java | 458 +++++++++++++++++++ .../processors/clock/GridJvmClockSource.java | 28 ++ .../dr/GridDrDataLoadCacheUpdater.java | 77 ++++ .../internal/processors/dr/GridDrType.java | 38 ++ .../processors/dr/GridRawVersionedEntry.java | 210 +++++++++ .../processors/dr/GridVersionedEntry.java | 80 ++++ .../ignite/internal/processors/dr/package.html | 25 + .../processors/interop/GridInteropAware.java | 49 ++ .../interop/GridInteropProcessor.java | 82 ++++ .../interop/GridInteropProcessorAdapter.java | 31 ++ .../processors/interop/GridInteropTarget.java | 108 +++++ .../interop/os/GridOsInteropProcessor.java | 80 ++++ .../internal/processors/interop/os/package.html | 23 + .../internal/processors/interop/package.html | 23 + .../portable/GridPortableInputStream.java | 177 +++++++ .../portable/GridPortableOutputStream.java | 172 +++++++ .../portable/GridPortableProcessor.java | 150 ++++++ .../processors/portable/GridPortableStream.java | 53 +++ .../portable/os/GridOsPortableProcessor.java | 125 +++++ .../processors/portable/os/package.html | 23 + .../internal/processors/portable/package.html | 23 + .../GridTcpCommunicationMessageAdapter.java | 2 +- .../GridTcpCommunicationMessageFactory.java | 2 +- .../GridTcpCommunicationMessageState.java | 2 +- .../util/portable/PortableRawWriterEx.java | 1 - .../processors/cache/GridCacheAdapter.java | 4 +- .../processors/cache/GridCacheContext.java | 1 - .../processors/cache/GridCacheEntryEx.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 4 +- .../processors/cache/GridCacheStoreManager.java | 2 +- .../cache/GridCacheWriteBehindStore.java | 2 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 2 +- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/preloader/GridDhtForceKeysFuture.java | 2 +- .../preloader/GridDhtPartitionDemandPool.java | 2 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../processors/cache/dr/GridCacheDrManager.java | 2 +- .../cache/dr/os/GridOsCacheDrManager.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../clock/GridClockDeltaSnapshot.java | 231 ---------- .../clock/GridClockDeltaSnapshotMessage.java | 226 --------- .../processors/clock/GridClockDeltaVersion.java | 117 ----- .../processors/clock/GridClockMessage.java | 171 ------- .../processors/clock/GridClockServer.java | 206 --------- .../processors/clock/GridClockSource.java | 30 -- .../clock/GridClockSyncProcessor.java | 458 ------------------- .../processors/clock/GridJvmClockSource.java | 28 -- .../dataload/IgniteDataLoaderImpl.java | 1 - .../dr/GridDrDataLoadCacheUpdater.java | 77 ---- .../grid/kernal/processors/dr/GridDrType.java | 38 -- .../processors/dr/GridRawVersionedEntry.java | 210 --------- .../processors/dr/GridVersionedEntry.java | 80 ---- .../grid/kernal/processors/dr/package.html | 25 - .../processors/interop/GridInteropAware.java | 49 -- .../interop/GridInteropProcessor.java | 82 ---- .../interop/GridInteropProcessorAdapter.java | 31 -- .../processors/interop/GridInteropTarget.java | 109 ----- .../interop/os/GridOsInteropProcessor.java | 80 ---- .../kernal/processors/interop/os/package.html | 23 - .../grid/kernal/processors/interop/package.html | 23 - .../portable/GridPortableInputStream.java | 177 ------- .../portable/GridPortableOutputStream.java | 172 ------- .../portable/GridPortableProcessor.java | 150 ------ .../processors/portable/GridPortableStream.java | 53 --- .../portable/os/GridOsPortableProcessor.java | 126 ----- .../kernal/processors/portable/os/package.html | 23 - .../kernal/processors/portable/package.html | 23 - .../clock/GridTimeSyncProcessorSelfTest.java | 223 +++++++++ .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../GridCacheReplicatedInvalidateSelfTest.java | 2 +- .../clock/GridTimeSyncProcessorSelfTest.java | 223 --------- ...idHadoopDefaultMapReducePlannerSelfTest.java | 2 +- 86 files changed, 3271 insertions(+), 3281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java index eed3b86..6ee5e8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEx.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.interop.*; +import org.apache.ignite.internal.processors.interop.*; import org.jetbrains.annotations.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java index f3461a4..5c923bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java @@ -49,19 +49,18 @@ import org.apache.ignite.internal.managers.security.*; import org.apache.ignite.internal.managers.swapspace.*; import org.gridgain.grid.kernal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.clock.*; +import org.apache.ignite.internal.processors.clock.*; import org.gridgain.grid.kernal.processors.closure.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; -import org.gridgain.grid.kernal.processors.interop.*; +import org.apache.ignite.internal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.license.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.port.*; -import org.gridgain.grid.kernal.processors.portable.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.processors.rest.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index aa5b24c..132497e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -35,20 +35,19 @@ import org.apache.ignite.internal.managers.securesession.*; import org.apache.ignite.internal.managers.swapspace.*; import org.gridgain.grid.kernal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.clock.*; +import org.apache.ignite.internal.processors.clock.*; import org.gridgain.grid.kernal.processors.closure.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.gridgain.grid.kernal.processors.interop.*; +import org.apache.ignite.internal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.license.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.port.*; -import org.gridgain.grid.kernal.processors.portable.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.processors.rest.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index ea3ba95..26c69b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -37,20 +37,19 @@ import org.gridgain.grid.kernal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.gridgain.grid.kernal.processors.cache.dr.os.*; -import org.gridgain.grid.kernal.processors.clock.*; +import org.apache.ignite.internal.processors.clock.*; import org.gridgain.grid.kernal.processors.closure.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.gridgain.grid.kernal.processors.interop.*; +import org.apache.ignite.internal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.license.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.port.*; -import org.gridgain.grid.kernal.processors.portable.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.processors.rest.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java index c67553b..e3b6c52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPortablesImpl.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; -import org.apache.ignite.internal.*; import org.apache.ignite.portables.*; -import org.gridgain.grid.kernal.processors.portable.*; import org.jetbrains.annotations.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java new file mode 100644 index 0000000..7e135f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.util.*; + +/** + * Snapshot of time deltas for given topology. + */ +public class GridClockDeltaSnapshot { + /** Time delta version. */ + private final GridClockDeltaVersion ver; + + /** Deltas between coordinator and nodes by node ID. */ + private final Map<UUID, Long> deltas; + + /** Pending delta values. */ + @GridToStringExclude + private final Map<UUID, DeltaAverage> pendingDeltas; + + /** + * @param ver Snapshot version. + * @param locNodeId Local node ID. + * @param discoSnap Discovery snapshot. + * @param avgSize Average size. + */ + public GridClockDeltaSnapshot( + GridClockDeltaVersion ver, + UUID locNodeId, + GridDiscoveryTopologySnapshot discoSnap, + int avgSize + ) { + assert ver.topologyVersion() == discoSnap.topologyVersion(); + + this.ver = ver; + + deltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f); + + pendingDeltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f); + + for (ClusterNode n : discoSnap.topologyNodes()) { + if (!locNodeId.equals(n.id())) + pendingDeltas.put(n.id(), new DeltaAverage(avgSize)); + } + } + + /** + * @param ver Snapshot version. + * @param deltas Deltas map. + */ + public GridClockDeltaSnapshot(GridClockDeltaVersion ver, Map<UUID, Long> deltas) { + this.ver = ver; + this.deltas = deltas; + + pendingDeltas = Collections.emptyMap(); + } + + /** + * @return Version. + */ + public GridClockDeltaVersion version() { + return ver; + } + + /** + * @return Map of collected deltas. + */ + public Map<UUID, Long> deltas() { + return Collections.unmodifiableMap(deltas); + } + + /** + * Awaits either until snapshot is ready or timeout elapses. + * + * @param timeout Timeout to wait. + * @throws IgniteInterruptedException If wait was interrupted. + */ + public synchronized void awaitReady(long timeout) throws IgniteInterruptedException { + long start = System.currentTimeMillis(); + + try { + while (!ready()) { + long now = System.currentTimeMillis(); + + if (start + timeout - now <= 0) + return; + + wait(start + timeout - now); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + + /** + * Callback invoked when time delta is received from remote node. + * + * @param nodeId Node ID. + * @param timeDelta Calculated time delta. + * @return {@code True} if more samples needed from that node. + */ + public synchronized boolean onDeltaReceived(UUID nodeId, long timeDelta) { + DeltaAverage avg = pendingDeltas.get(nodeId); + + if (avg != null) { + avg.onValue(timeDelta); + + if (avg.ready()) { + pendingDeltas.remove(nodeId); + + deltas.put(nodeId, avg.average()); + + if (ready()) + notifyAll(); + + return false; + } + + return true; + } + + return false; + } + + /** + * Callback invoked when node left. + * + * @param nodeId Left node ID. + */ + public synchronized void onNodeLeft(UUID nodeId) { + pendingDeltas.remove(nodeId); + + deltas.put(nodeId, 0L); + + if (ready()) + notifyAll(); + } + + /** + * @return {@code True} if snapshot is ready. + */ + public synchronized boolean ready() { + return pendingDeltas.isEmpty(); + } + + /** + * @return Collection of node IDs for which response was not received so far. + */ + public synchronized Collection<UUID> pendingNodeIds() { + // Must return copy. + return new HashSet<>(pendingDeltas.keySet()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClockDeltaSnapshot.class, this); + } + + /** + * Delta average. + */ + private static class DeltaAverage { + /** Delta values. */ + private long[] vals; + + /** Current index. */ + private int idx; + + /** + * @param size Accumulator size. + */ + private DeltaAverage(int size) { + vals = new long[size]; + } + + /** + * Adds value to accumulator. + * + * @param val Value to add. + */ + public void onValue(long val) { + if (idx < vals.length) + vals[idx++] = val; + } + + /** + * Whether this average is complete. + * + * @return {@code True} if enough values is collected. + */ + public boolean ready() { + return idx == vals.length; + } + + /** + * @return Average delta. + */ + public long average() { + long sum = 0; + + for (long val : vals) + sum += val; + + return sum / vals.length; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java new file mode 100644 index 0000000..3164c46 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Message containing time delta map for all nodes. + */ +public class GridClockDeltaSnapshotMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Snapshot version. */ + private GridClockDeltaVersion snapVer; + + /** Grid time deltas. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = long.class) + private Map<UUID, Long> deltas; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridClockDeltaSnapshotMessage() { + // No-op. + } + + /** + * @param snapVer Snapshot version. + * @param deltas Deltas map. + */ + public GridClockDeltaSnapshotMessage(GridClockDeltaVersion snapVer, Map<UUID, Long> deltas) { + this.snapVer = snapVer; + this.deltas = deltas; + } + + /** + * @return Snapshot version. + */ + public GridClockDeltaVersion snapshotVersion() { + return snapVer; + } + + /** + * @return Time deltas map. + */ + public Map<UUID, Long> deltas() { + return deltas; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridClockDeltaSnapshotMessage _clone = new GridClockDeltaSnapshotMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridClockDeltaSnapshotMessage _clone = (GridClockDeltaSnapshotMessage)_msg; + + _clone.snapVer = snapVer; + _clone.deltas = deltas; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (deltas != null) { + if (commState.it == null) { + if (!commState.putInt(deltas.size())) + return false; + + commState.it = deltas.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<UUID, Long> e = (Map.Entry<UUID, Long>)commState.cur; + + if (!commState.keyDone) { + if (!commState.putUuid(e.getKey())) + return false; + + commState.keyDone = true; + } + + if (!commState.putLong(e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 1: + if (!commState.putClockDeltaVersion(snapVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (deltas == null) + deltas = U.newHashMap(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { + UUID _val = commState.getUuid(); + + if (_val == UUID_NOT_READ) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + + if (buf.remaining() < 8) + return false; + + long _val = commState.getLong(); + + deltas.put((UUID)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 1: + GridClockDeltaVersion snapVer0 = commState.getClockDeltaVersion(); + + if (snapVer0 == CLOCK_DELTA_VER_NOT_READ) + return false; + + snapVer = snapVer0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 59; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClockDeltaSnapshotMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java new file mode 100644 index 0000000..aa03c88 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Version for time delta snapshot. + */ +public class GridClockDeltaVersion implements Comparable<GridClockDeltaVersion>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Snapshot local version. */ + private long ver; + + /** Topology version. */ + private long topVer; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridClockDeltaVersion() { + // No-op. + } + + /** + * @param ver Version. + * @param topVer Topology version. + */ + public GridClockDeltaVersion(long ver, long topVer) { + this.ver = ver; + this.topVer = topVer; + } + + /** + * @return Snapshot local version. + */ + public long version() { + return ver; + } + + /** + * @return Snapshot topology version. + */ + public long topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public int compareTo(GridClockDeltaVersion o) { + if (topVer == o.topVer) { + if (ver == o.ver) + return 0; + + return ver > o.ver ? 1 : -1; + } + + return topVer > o.topVer ? 1 : -1; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof GridClockDeltaVersion)) + return false; + + GridClockDeltaVersion that = (GridClockDeltaVersion)o; + + return topVer == that.topVer && ver == that.ver; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int)(ver ^ (ver >>> 32)); + + res = 31 * res + (int)(topVer ^ (topVer >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(ver); + out.writeLong(topVer); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ver = in.readLong(); + topVer = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClockDeltaVersion.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java new file mode 100644 index 0000000..90b8000 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Time server message. + */ +public class GridClockMessage { + /** Packet size. */ + public static final int PACKET_SIZE = 48; + + /** Originating node ID. */ + private UUID origNodeId; + + /** Target node ID. */ + private UUID targetNodeId; + + /** Originating timestamp. */ + private long origTs; + + /** Remote node reply ts. */ + private long replyTs; + + /** + * @param origNodeId Originating node ID. + * @param targetNodeId Target node ID. + * @param origTs Originating timestamp. + * @param replyTs Reply timestamp. + */ + public GridClockMessage(UUID origNodeId, UUID targetNodeId, long origTs, long replyTs) { + this.origNodeId = origNodeId; + this.targetNodeId = targetNodeId; + this.origTs = origTs; + this.replyTs = replyTs; + } + + /** + * @return Originating node ID. + */ + public UUID originatingNodeId() { + return origNodeId; + } + + /** + * @param origNodeId Originating node ID. + */ + public void originatingNodeId(UUID origNodeId) { + this.origNodeId = origNodeId; + } + + /** + * @return Target node ID. + */ + public UUID targetNodeId() { + return targetNodeId; + } + + /** + * @param targetNodeId Target node ID. + */ + public void targetNodeId(UUID targetNodeId) { + this.targetNodeId = targetNodeId; + } + + /** + * @return Originating timestamp. + */ + public long originatingTimestamp() { + return origTs; + } + + /** + * @param origTs Originating timestamp. + */ + public void originatingTimestamp(long origTs) { + this.origTs = origTs; + } + + /** + * @return Reply timestamp. + */ + public long replyTimestamp() { + return replyTs; + } + + /** + * @param replyTs Reply timestamp. + */ + public void replyTimestamp(long replyTs) { + this.replyTs = replyTs; + } + + /** + * Converts message to bytes to send over network. + * + * @return Bytes representing this packet. + */ + public byte[] toBytes() { + byte[] buf = new byte[PACKET_SIZE]; + + int off = 0; + + off = U.longToBytes(origNodeId.getLeastSignificantBits(), buf, off); + off = U.longToBytes(origNodeId.getMostSignificantBits(), buf, off); + + off = U.longToBytes(targetNodeId.getLeastSignificantBits(), buf, off); + off = U.longToBytes(targetNodeId.getMostSignificantBits(), buf, off); + + off = U.longToBytes(origTs, buf, off); + + off = U.longToBytes(replyTs, buf, off); + + assert off == PACKET_SIZE; + + return buf; + } + + /** + * Constructs message from bytes. + * + * @param buf Bytes. + * @param off Offset. + * @param len Packet length. + * @return Assembled message. + * @throws IgniteCheckedException If message length is invalid. + */ + public static GridClockMessage fromBytes(byte[] buf, int off, int len) throws IgniteCheckedException { + if (len < PACKET_SIZE) + throw new IgniteCheckedException("Failed to assemble time server packet (message is too short)."); + + long lsb = U.bytesToLong(buf, off); + long msb = U.bytesToLong(buf, off + 8); + + UUID origNodeId = new UUID(msb, lsb); + + lsb = U.bytesToLong(buf, off + 16); + msb = U.bytesToLong(buf, off + 24); + + UUID targetNodeId = new UUID(msb, lsb); + + long origTs = U.bytesToLong(buf, off + 32); + long replyTs = U.bytesToLong(buf, off + 40); + + return new GridClockMessage(origNodeId, targetNodeId, origTs, replyTs); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClockMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java new file mode 100644 index 0000000..f188181 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; + +import java.io.*; +import java.net.*; + +/** + * Time server that enables time synchronization between nodes. + */ +public class GridClockServer { + /** Kernal context. */ + private GridKernalContext ctx; + + /** Datagram socket for message exchange. */ + private DatagramSocket sock; + + /** Logger. */ + private IgniteLogger log; + + /** Read worker. */ + private GridWorker readWorker; + + /** Instance of time processor. */ + private GridClockSyncProcessor clockSync; + + /** + * Starts server. + * + * @param ctx Kernal context. + * @throws IgniteCheckedException If server could not be started. + */ + public void start(GridKernalContext ctx) throws IgniteCheckedException { + this.ctx = ctx; + + clockSync = ctx.clockSync(); + log = ctx.log(GridClockServer.class); + + try { + int startPort = ctx.config().getTimeServerPortBase(); + int endPort = startPort + ctx.config().getTimeServerPortRange() - 1; + + InetAddress locHost = !F.isEmpty(ctx.config().getLocalHost()) ? + InetAddress.getByName(ctx.config().getLocalHost()) : + U.getLocalHost(); + + for (int p = startPort; p <= endPort; p++) { + try { + sock = new DatagramSocket(p, locHost); + + if (log.isDebugEnabled()) + log.debug("Successfully bound time server [host=" + locHost + ", port=" + p + ']'); + + break; + } + catch (SocketException e) { + if (log.isDebugEnabled()) + log.debug("Failed to bind time server socket [host=" + locHost + ", port=" + p + + ", err=" + e.getMessage() + ']'); + } + } + + if (sock == null) + throw new IgniteCheckedException("Failed to bind time server socket within specified port range [locHost=" + + locHost + ", startPort=" + startPort + ", endPort=" + endPort + ']'); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to start time server (failed to get local host address)", e); + } + } + + /** + * After start callback. + */ + public void afterStart() { + readWorker = new ReadWorker(); + + IgniteThread th = new IgniteThread(readWorker); + + th.setPriority(Thread.MAX_PRIORITY); + + th.start(); + } + + /** + * Stops server. + */ + public void stop() { + // No-op. + } + + /** + * Before stop callback. + */ + public void beforeStop() { + if (readWorker != null) + readWorker.cancel(); + + U.closeQuiet(sock); + + if (readWorker != null) + U.join(readWorker, log); + } + + /** + * Sends packet to remote node. + * + * @param msg Message to send. + * @param addr Address. + * @param port Port. + * @throws IgniteCheckedException If send failed. + */ + public void sendPacket(GridClockMessage msg, InetAddress addr, int port) throws IgniteCheckedException { + try { + DatagramPacket packet = new DatagramPacket(msg.toBytes(), GridClockMessage.PACKET_SIZE, addr, port); + + if (log.isDebugEnabled()) + log.debug("Sending time sync packet [msg=" + msg + ", addr=" + addr + ", port=" + port); + + sock.send(packet); + } + catch (IOException e) { + if (!sock.isClosed()) + throw new IgniteCheckedException("Failed to send datagram message to remote node [addr=" + addr + + ", port=" + port + ", msg=" + msg + ']', e); + } + } + + /** + * @return Address to which this server is bound. + */ + public InetAddress host() { + return sock.getLocalAddress(); + } + + /** + * @return Port to which this server is bound. + */ + public int port() { + return sock.getLocalPort(); + } + + /** + * Message read worker. + */ + private class ReadWorker extends GridWorker { + /** + * Creates read worker. + */ + protected ReadWorker() { + super(ctx.gridName(), "grid-time-server-reader", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + DatagramPacket packet = new DatagramPacket(new byte[GridClockMessage.PACKET_SIZE], + GridClockMessage.PACKET_SIZE); + + while (!isCancelled()) { + try { + // Read packet from buffer. + sock.receive(packet); + + if (log.isDebugEnabled()) + log.debug("Received clock sync message from remote node [host=" + packet.getAddress() + + ", port=" + packet.getPort() + ']'); + + GridClockMessage msg = GridClockMessage.fromBytes(packet.getData(), packet.getOffset(), + packet.getLength()); + + clockSync.onMessageReceived(msg, packet.getAddress(), packet.getPort()); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to assemble clock server message (will ignore the packet) [host=" + + packet.getAddress() + ", port=" + packet.getPort() + ", err=" + e.getMessage() + ']'); + } + catch (IOException e) { + if (!isCancelled()) + U.warn(log, "Failed to receive message on datagram socket: " + e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java new file mode 100644 index 0000000..65f746a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +/** + * Interface representing time source for time processor. + */ +public interface GridClockSource { + /** + * Gets current time in milliseconds past since 1 January, 1970. + * + * @return Current time in milliseconds. + */ + public long currentTimeMillis(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java new file mode 100644 index 0000000..d1c9931 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.GridNodeAttributes.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * Time synchronization processor. + */ +public class GridClockSyncProcessor extends GridProcessorAdapter { + /** Maximum size for time sync history. */ + private static final int MAX_TIME_SYNC_HISTORY = 100; + + /** Time server instance. */ + private GridClockServer srv; + + /** Shutdown lock. */ + private GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** Time coordinator thread. */ + private volatile TimeCoordinator timeCoord; + + /** Time delta history. Constructed on coorinator. */ + private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist = + new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY); + + /** Time source. */ + private GridClockSource clockSrc; + + /** + * @param ctx Kernal context. + */ + public GridClockSyncProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + + clockSrc = ctx.timeSource(); + + srv = new GridClockServer(); + + srv.start(ctx); + + ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof GridClockDeltaSnapshotMessage; + + GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg; + + GridClockDeltaVersion ver = msg0.snapshotVersion(); + + timeSyncHist.put(ver, new GridClockDeltaSnapshot(ver, msg0.deltas())); + } + }); + + // We care only about node leave and fail events. + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED; + + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) + checkLaunchCoordinator(discoEvt); + + TimeCoordinator timeCoord0 = timeCoord; + + if (timeCoord0 != null) + timeCoord0.onDiscoveryEvent(discoEvt); + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED); + } + + /** {@inheritDoc} */ + @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { + super.addAttributes(attrs); + + attrs.put(ATTR_TIME_SERVER_HOST, srv.host()); + attrs.put(ATTR_TIME_SERVER_PORT, srv.port()); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + srv.afterStart(); + + // Check at startup if this node is a fragmentizer coordinator. + IgniteDiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent(); + + checkLaunchCoordinator(locJoinEvt); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + rw.writeLock(); + + try { + stopping = false; + + if (timeCoord != null) { + timeCoord.cancel(); + + U.join(timeCoord, log); + + timeCoord = null; + } + + if (srv != null) + srv.beforeStop(); + } + finally { + rw.writeUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + if (srv != null) + srv.stop(); + } + + /** + * Gets current time on local node. + * + * @return Current time in milliseconds. + */ + private long currentTime() { + return clockSrc.currentTimeMillis(); + } + + /** + * @return Time sync history. + */ + public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHistory() { + return timeSyncHist; + } + + /** + * Callback from server for message receiving. + * + * @param msg Received message. + * @param addr Remote node address. + * @param port Remote node port. + */ + public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) { + long rcvTs = currentTime(); + + if (!msg.originatingNodeId().equals(ctx.localNodeId())) { + // We received time request from remote node, set current time and reply back. + msg.replyTimestamp(rcvTs); + + try { + srv.sendPacket(msg, addr, port); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send time server reply to remote node: " + msg, e); + } + } + else + timeCoord.onMessage(msg, rcvTs); + } + + /** + * Checks if local node is the oldest node in topology and starts time coordinator if so. + * + * @param discoEvt Discovery event. + */ + private void checkLaunchCoordinator(IgniteDiscoveryEvent discoEvt) { + rw.readLock(); + + try { + if (stopping) + return; + + if (timeCoord == null) { + long minNodeOrder = Long.MAX_VALUE; + + Collection<ClusterNode> nodes = discoEvt.topologyNodes(); + + for (ClusterNode node : nodes) { + if (node.order() < minNodeOrder) + minNodeOrder = node.order(); + } + + ClusterNode locNode = ctx.grid().localNode(); + + if (locNode.order() == minNodeOrder) { + if (log.isDebugEnabled()) + log.debug("Detected local node to be the eldest node in topology, starting time " + + "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']'); + + synchronized (this) { + if (timeCoord == null && !stopping) { + timeCoord = new TimeCoordinator(discoEvt); + + IgniteThread th = new IgniteThread(timeCoord); + + th.setPriority(Thread.MAX_PRIORITY); + + th.start(); + } + } + } + } + } + finally { + rw.readUnlock(); + } + } + + /** + * Gets time adjusted with time coordinator on given topology version. + * + * @param topVer Topology version. + * @return Adjusted time. + */ + public long adjustedTime(long topVer) { + // Get last synchronized time on given topology version. + Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry( + new GridClockDeltaVersion(0, topVer + 1)); + + GridClockDeltaSnapshot snap = entry == null ? null : entry.getValue(); + + long now = clockSrc.currentTimeMillis(); + + if (snap == null) + return System.currentTimeMillis(); + + Long delta = snap.deltas().get(ctx.localNodeId()); + + if (delta == null) + delta = 0L; + + return now + delta; + } + + /** + * Publishes snapshot to topology. + * + * @param snapshot Snapshot to publish. + * @param top Topology to send given snapshot to. + */ + private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) { + if (!rw.tryReadLock()) + return; + + try { + timeSyncHist.put(snapshot.version(), snapshot); + + for (ClusterNode n : top.topologyNodes()) { + GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage( + snapshot.version(), snapshot.deltas()); + + try { + ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().pingNode(n.id())) + U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + + "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) " + + "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); + } + } + } + finally { + rw.readUnlock(); + } + } + + /** + * Time coordinator thread. + */ + private class TimeCoordinator extends GridWorker { + /** Last discovery topology snapshot. */ + private volatile GridDiscoveryTopologySnapshot lastSnapshot; + + /** Snapshot being constructed. May be not null only on coordinator node. */ + private volatile GridClockDeltaSnapshot pendingSnapshot; + + /** Version counter. */ + private long verCnt = 1; + + /** + * Time coordinator thread constructor. + * + * @param evt Discovery event on which this node became a coordinator. + */ + protected TimeCoordinator(IgniteDiscoveryEvent evt) { + super(ctx.gridName(), "grid-time-coordinator", log); + + lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + while (!isCancelled()) { + GridDiscoveryTopologySnapshot top = lastSnapshot; + + if (log.isDebugEnabled()) + log.debug("Creating time sync snapshot for topology: " + top); + + GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot( + new GridClockDeltaVersion(verCnt++, top.topologyVersion()), + ctx.localNodeId(), + top, + ctx.config().getClockSyncSamples()); + + pendingSnapshot = snapshot; + + while (!snapshot.ready()) { + if (log.isDebugEnabled()) + log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds()); + + for (UUID nodeId : snapshot.pendingNodeIds()) + requestTime(nodeId); + + if (log.isDebugEnabled()) + log.debug("Waiting for snapshot to be ready: " + snapshot); + + // Wait for all replies + snapshot.awaitReady(1000); + } + + // No more messages should be processed. + pendingSnapshot = null; + + if (log.isDebugEnabled()) + log.debug("Collected time sync results: " + snapshot.deltas()); + + publish(snapshot, top); + + synchronized (this) { + if (top.topologyVersion() == lastSnapshot.topologyVersion()) + wait(ctx.config().getClockSyncFrequency()); + } + } + } + + /** + * @param evt Discovery event. + */ + public void onDiscoveryEvent(IgniteDiscoveryEvent evt) { + if (log.isDebugEnabled()) + log.debug("Processing discovery event: " + evt); + + if (evt.type() == IgniteEventType.EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) + onNodeLeft(evt.eventNode().id()); + + synchronized (this) { + lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes()); + + notifyAll(); + } + } + + /** + * @param msg Message received from remote node. + * @param rcvTs Receive timestamp. + */ + private void onMessage(GridClockMessage msg, long rcvTs) { + GridClockDeltaSnapshot curr = pendingSnapshot; + + if (curr != null) { + long delta = (msg.originatingTimestamp() + rcvTs) / 2 - msg.replyTimestamp(); + + boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta); + + if (needMore) + requestTime(msg.targetNodeId()); + } + } + + /** + * Requests time from remote node. + * + * @param rmtNodeId Remote node ID. + */ + private void requestTime(UUID rmtNodeId) { + ClusterNode node = ctx.discovery().node(rmtNodeId); + + if (node != null) { + InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST); + int port = node.attribute(ATTR_TIME_SERVER_PORT); + + try { + GridClockMessage req = new GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0); + + srv.sendPacket(req, addr, port); + } + catch (IgniteCheckedException e) { + LT.warn(log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId + + ", addr=" + addr + ", port=" + port + ']'); + } + } + else + onNodeLeft(rmtNodeId); + } + + /** + * Node left callback. + * + * @param nodeId Left node ID. + */ + private void onNodeLeft(UUID nodeId) { + GridClockDeltaSnapshot curr = pendingSnapshot; + + if (curr != null) + curr.onNodeLeft(nodeId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java new file mode 100644 index 0000000..e10d816 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.clock; + +/** + * JVM time source. + */ +public class GridJvmClockSource implements GridClockSource { + /** {@inheritDoc} */ + @Override public long currentTimeMillis() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java new file mode 100644 index 0000000..f874e4b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dr; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.dataload.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.dr.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * Data center replication cache updater for data loader. + */ +public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoadCacheUpdater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) + throws IgniteCheckedException { + String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); + + GridKernalContext ctx = ((GridKernal)cache0.unwrap(Ignite.class)).context(); + IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); + GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); + + assert !F.isEmpty(col); + + if (log.isDebugEnabled()) + log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); + + IgniteFuture<?> f = cache.context().preloader().startFuture(); + + if (!f.isDone()) + f.get(); + + for (Map.Entry<K, V> entry0 : col) { + GridVersionedEntry<K, V> entry = (GridVersionedEntry<K, V>)entry0; + + entry.unmarshal(ctx.config().getMarshaller()); + + K key = entry.key(); + + GridCacheDrInfo<V> val = entry.value() != null ? entry.expireTime() != 0 ? + new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : + new GridCacheDrInfo<>(entry.value(), entry.version()) : null; + + if (val == null) + cache.removeAllDr(Collections.singletonMap(key, entry.version())); + else + cache.putAllDr(Collections.singletonMap(key, val)); + } + + if (log.isDebugEnabled()) + log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java new file mode 100644 index 0000000..ac1c19c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dr; + +/** + * Data center replication type. + */ +public enum GridDrType { + /** Do not replicate that entry. */ + DR_NONE, + + /** Regular replication on primary node. */ + DR_PRIMARY, + + /** Regular replication on backup node. */ + DR_BACKUP, + + /** Replication during load. */ + DR_LOAD, + + /** Replication during preload. */ + DR_PRELOAD +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java new file mode 100644 index 0000000..eff6d1c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridRawVersionedEntry.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dr; + +import org.apache.ignite.*; +import org.apache.ignite.marshaller.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public class GridRawVersionedEntry<K, V> implements GridVersionedEntry<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Key. */ + private K key; + + /** Key bytes. */ + private byte[] keyBytes; + + /** Value. */ + private V val; + + /** Value bytes. */ + private byte[] valBytes; + + /** TTL. */ + private long ttl; + + /** Expire time. */ + private long expireTime; + + /** Version. */ + private GridCacheVersion ver; + + /** + * {@code Externalizable) support. + */ + public GridRawVersionedEntry() { + // No-op. + } + + /** + * Constructor. + * + * @param key Key. + * @param keyBytes Key bytes. + * @param val Value. + * @param valBytes Value bytes. + * @param expireTime Expire time. + * @param ttl TTL. + * @param ver Version. + */ + public GridRawVersionedEntry(K key, + @Nullable byte[] keyBytes, + @Nullable V val, + @Nullable byte[] valBytes, + long ttl, + long expireTime, + GridCacheVersion ver) { + this.key = key; + this.keyBytes = keyBytes; + this.val = val; + this.valBytes = valBytes; + this.ttl = ttl; + this.expireTime = expireTime; + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public K key() { + assert key != null : "Entry is being improperly processed."; + + return key; + } + + /** + * @return Key bytes. + */ + public byte[] keyBytes() { + return keyBytes; + } + + /** {@inheritDoc} */ + @Override public V value() { + return val; + } + + /** + * @return Value bytes. + */ + public byte[] valueBytes() { + return valBytes; + } + + /** {@inheritDoc} */ + @Override public long ttl() { + return ttl; + } + + /** {@inheritDoc} */ + @Override public long expireTime() { + return expireTime; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public void unmarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + unmarshalKey(marsh); + + if (valBytes != null && val == null) + val = marsh.unmarshal(valBytes, null); + } + + /** + * Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before + * its restored key/value are needed. + * + * @param marsh Marshaller. + * @throws IgniteCheckedException If failed. + */ + private void unmarshalKey(IgniteMarshaller marsh) throws IgniteCheckedException { + if (key == null) + key = marsh.unmarshal(keyBytes, null); + } + + /** {@inheritDoc} */ + @Override public void marshal(IgniteMarshaller marsh) throws IgniteCheckedException { + if (keyBytes == null) + keyBytes = marsh.marshal(key); + + if (valBytes == null && val != null) + valBytes = marsh.marshal(val); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert keyBytes != null; + + U.writeByteArray(out, keyBytes); + U.writeByteArray(out, valBytes); + + out.writeLong(ttl); + + if (ttl != 0) + out.writeLong(expireTime); + + out.writeObject(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + keyBytes = U.readByteArray(in); + valBytes = U.readByteArray(in); + + ttl = in.readLong(); + + if (ttl != 0) + expireTime = in.readLong(); + + ver = (GridCacheVersion)in.readObject(); + + assert keyBytes != null; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key(); + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return value(); + } + + /** {@inheritDoc} */ + @Override public V setValue(V val) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRawVersionedEntry.class, this, "keyBytesLen", keyBytes != null ? keyBytes.length : "n/a", + "valBytesLen", valBytes != null ? valBytes.length : "n/a"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java new file mode 100644 index 0000000..e05e8f6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridVersionedEntry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dr; + +import org.apache.ignite.*; +import org.apache.ignite.marshaller.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public interface GridVersionedEntry<K, V> extends Map.Entry<K, V> { + /** + * Gets entry's key. + * + * @return Entry's key. + */ + public K key(); + + /** + * Gets entry's value. + * + * @return Entry's value. + */ + @Nullable public V value(); + + /** + * Gets entry's TTL. + * + * @return Entry's TTL. + */ + public long ttl(); + + /** + * Gets entry's expire time. + * + * @return Entry's expire time. + */ + public long expireTime(); + + /** + * @return Version. + */ + public GridCacheVersion version(); + + /** + * Perform internal marshal of this entry before it will be serialized. + * + * @param marsh Marshaller. + * @throws IgniteCheckedException If failed. + */ + public void marshal(IgniteMarshaller marsh) throws IgniteCheckedException; + + /** + * Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before + * its restored key/value are needed. + * + * @param marsh Marshaller. + * @throws IgniteCheckedException If failed. + */ + public void unmarshal(IgniteMarshaller marsh) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html new file mode 100644 index 0000000..55ce324 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/package.html @@ -0,0 +1,25 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <img alt="icon" class="javadocimg" src="{@docRoot}/img/cube.gif"/> + Data center replication processor. + <p> + @html.java.package +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java new file mode 100644 index 0000000..81035d8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropAware.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.interop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; + +/** + * Interface for interop-aware components. + */ +public interface GridInteropAware { + /** + * Sets configuration parameters. + * + * @param params Configuration parameters. + */ + public void configure(Object... params); + + /** + * Initializes interop-aware component. + * + * @param ctx Context. + * @throws IgniteCheckedException In case of error. + */ + public void initialize(GridKernalContext ctx) throws IgniteCheckedException; + + /** + * Destroys interop-aware component. + * + * @param ctx Context. + * @throws IgniteCheckedException In case of error. + */ + public void destroy(GridKernalContext ctx) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java new file mode 100644 index 0000000..33c841a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.interop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.*; +import org.jetbrains.annotations.*; + +/** + * Interop processor. + */ +public interface GridInteropProcessor extends GridProcessor { + /** + * Release start latch. + */ + public void releaseStart(); + + /** + * Await start on native side. + * + * @throws IgniteCheckedException If failed. + */ + public void awaitStart() throws IgniteCheckedException; + + /** + * @return Environment pointer. + */ + public long environmentPointer() throws IgniteCheckedException; + + /** + * @return Grid name. + */ + public String gridName(); + + /** + * Gets native wrapper for default Grid projection. + * + * @return Native compute wrapper. + * @throws IgniteCheckedException If failed. + */ + public GridInteropTarget projection() throws IgniteCheckedException; + + /** + * Gets native wrapper for cache with the given name. + * + * @param name Cache name ({@code null} for default cache). + * @return Native cache wrapper. + * @throws IgniteCheckedException If failed. + */ + public GridInteropTarget cache(@Nullable String name) throws IgniteCheckedException; + + /** + * Gets native wrapper for data loader for cache with the given name. + * + * @param cacheName Cache name ({@code null} for default cache). + * @return Native data loader wrapper. + * @throws IgniteCheckedException If failed. + */ + public GridInteropTarget dataLoader(@Nullable String cacheName) throws IgniteCheckedException; + + /** + * Stops grid. + * + * @param cancel Cancel flag. + */ + public void close(boolean cancel); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c602549a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java new file mode 100644 index 0000000..91ea27e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/GridInteropProcessorAdapter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.interop; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; + +/** + * Interop processor adapter. + */ +public abstract class GridInteropProcessorAdapter extends GridProcessorAdapter implements GridInteropProcessor { + /** {@inheritDoc} */ + protected GridInteropProcessorAdapter(GridKernalContext ctx) { + super(ctx); + } +}
