Repository: ignite Updated Branches: refs/heads/ignite-3478 fb3ee2478 -> 8cf2aad73
ignite-3478 Support for streamer Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cf2aad7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cf2aad7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cf2aad7 Branch: refs/heads/ignite-3478 Commit: 8cf2aad733aeda8ae4575e093315c052b682b455 Parents: fb3ee24 Author: sboikov <[email protected]> Authored: Fri Sep 29 15:38:37 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 29 15:38:37 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryInfo.java | 5 ++ .../cache/IgniteCacheOffheapManagerImpl.java | 65 +++++++++++--------- .../cache/mvcc/CacheCoordinatorsProcessor.java | 5 +- .../cache/mvcc/MvccCoordinatorVersion.java | 7 ++- .../mvcc/MvccCoordinatorVersionResponse.java | 24 ++++++++ .../datastreamer/DataStreamerImpl.java | 14 ++++- .../cache/mvcc/CacheMvccTransactionsTest.java | 32 ++++++++++ 7 files changed, 118 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index e09d33c..8a5f0df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -101,6 +101,11 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion { return 0; } + /** {@inheritDoc} */ + @Override public boolean initialLoad() { + return true; + } + /** * @return Cache ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 78f8913..714c4bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1333,11 +1333,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridLongList mvccUpdate(GridCacheContext cctx, + @Override public GridLongList mvccUpdate( + GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { + assert mvccVer != null; + if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -1370,49 +1373,51 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; - MvccLongList activeTxs = mvccVer.activeTransactions(); + GridLongList waitTxs = null; - // TODO IGNITE-3484: need special method. - GridCursor<CacheDataRow> cur = dataTree.find( - new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), - new MvccSearchRow(cacheId, key, 1, 1)); + if (!mvccVer.initialLoad()) { + MvccLongList activeTxs = mvccVer.activeTransactions(); - GridLongList waitTxs = null; + // TODO IGNITE-3484: need special method. + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), + new MvccSearchRow(cacheId, key, 1, 1)); - boolean first = true; + boolean first = true; - boolean activeTx = false; + boolean activeTx = false; - while (cur.next()) { - CacheDataRow oldVal = cur.get(); + while (cur.next()) { + CacheDataRow oldVal = cur.get(); - assert oldVal.link() != 0 : oldVal; + assert oldVal.link() != 0 : oldVal; - if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && - activeTxs.contains(oldVal.mvccCounter())) { - if (waitTxs == null) - waitTxs = new GridLongList(); + if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && + activeTxs.contains(oldVal.mvccCounter())) { + if (waitTxs == null) + waitTxs = new GridLongList(); - assert oldVal.mvccCounter() != mvccVer.counter(); + assert oldVal.mvccCounter() != mvccVer.counter(); - waitTxs.add(oldVal.mvccCounter()); + waitTxs.add(oldVal.mvccCounter()); - activeTx = true; - } + activeTx = true; + } - if (!activeTx) { - // Should not delete oldest version which is less than cleanup version. - int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + if (!activeTx) { + // Should not delete oldest version which is less than cleanup version. + int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); - if (cmp <= 0) { - if (first) - first = false; - else { - boolean rmvd = dataTree.removex(oldVal); + if (cmp <= 0) { + if (first) + first = false; + else { + boolean rmvd = dataTree.removex(oldVal); - assert rmvd; + assert rmvd; - rowStore.removeRow(oldVal.link()); + rowStore.removeRow(oldVal.link()); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index ac55164..5080c83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -76,6 +76,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { public static final long COUNTER_NA = 0L; /** */ + public static final long START_VER = 1L; + + /** */ private static final boolean STAT_CNTRS = false; /** */ @@ -88,7 +91,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private volatile MvccCoordinator curCrd; /** */ - private final AtomicLong mvccCntr = new AtomicLong(1L); + private final AtomicLong mvccCntr = new AtomicLong(START_VER); /** */ private final GridAtomicLong committedCntr = new GridAtomicLong(1L); http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java index eef3587..a0fd5ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java @@ -34,7 +34,7 @@ public interface MvccCoordinatorVersion extends Message { public long coordinatorVersion(); /** - * @return Cleanup version. + * @return Cleanup version (all smaller versions are safe to remove). */ public long cleanupVersion(); @@ -42,4 +42,9 @@ public interface MvccCoordinatorVersion extends Message { * @return Counter. */ public long counter(); + + /** + * + */ + public boolean initialLoad(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java index 04ef8d8..20d23ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java @@ -59,7 +59,20 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M } /** + * @param crdVer Coordinator version. * @param cntr Counter. + * @param cleanupVer Cleanup version. + */ + public MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) { + this.crdVer = crdVer; + this.cntr = cntr; + this.cleanupVer = cleanupVer; + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + * @param cleanupVer Cleanup version. * @param futId Future ID. */ void init(long futId, long crdVer, long cntr, long cleanupVer) { @@ -69,6 +82,9 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M this.cleanupVer = cleanupVer; } + /** + * @param txId Transaction counter. + */ void addTx(long txId) { if (txs == null) txs = new long[4]; @@ -78,6 +94,9 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M txs[txsCnt++] = txId; } + /** + * + */ void resetTransactionsCount() { txsCnt = 0; } @@ -140,6 +159,11 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M } /** {@inheritDoc} */ + @Override public boolean initialLoad() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 257a127..6ced2f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -86,6 +86,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.dr.GridDrType; @@ -129,6 +132,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Amount of permissions should be available to continue new data processing. */ private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE; + /** Version which is less then any version generated on coordinator. */ + private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER = + new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) { + @Override public boolean initialLoad() { + return true; + } + }; + /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; @@ -2067,10 +2078,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer); - // TODO IGNITE-3478 (mvcc version). entry.initialValue(e.getValue(), ver, - null, + ISOLATED_STREAMER_MVCC_VER, ttl, expiryTime, false, http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 2d6afb4..f28fe2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheMode; @@ -2354,6 +2355,37 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testLoadWithStreamer() throws Exception { + startGridsMultiThreaded(5); + + client = true; + + startGrid(5); + + Ignite node = ignite(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, 64)); + + final int KEYS = 10_000; + + Map<Integer, Integer> data = new HashMap<>(); + + try (IgniteDataStreamer<Integer, Integer> streamer = node.dataStreamer(cache.getName())) { + for (int i = 0; i < KEYS; i++) { + streamer.addData(i, i); + + data.put(i, i); + } + } + + checkCacheData(data, cache.getName()); + + checkPutGet(F.asList(cache.getName())); + } + + /** * @param N Number of object to update in single transaction. * @param srvs Number of server nodes. * @param clients Number of client nodes.
