Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 fb3ee2478 -> 8cf2aad73


ignite-3478 Support for streamer


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cf2aad7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cf2aad7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cf2aad7

Branch: refs/heads/ignite-3478
Commit: 8cf2aad733aeda8ae4575e093315c052b682b455
Parents: fb3ee24
Author: sboikov <[email protected]>
Authored: Fri Sep 29 15:38:37 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 29 15:38:37 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryInfo.java    |  5 ++
 .../cache/IgniteCacheOffheapManagerImpl.java    | 65 +++++++++++---------
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  5 +-
 .../cache/mvcc/MvccCoordinatorVersion.java      |  7 ++-
 .../mvcc/MvccCoordinatorVersionResponse.java    | 24 ++++++++
 .../datastreamer/DataStreamerImpl.java          | 14 ++++-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 32 ++++++++++
 7 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index e09d33c..8a5f0df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,6 +101,11 @@ public class GridCacheEntryInfo implements Message, 
MvccCoordinatorVersion {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean initialLoad() {
+        return true;
+    }
+
     /**
      * @return Cache ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 78f8913..714c4bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1333,11 +1333,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public GridLongList mvccUpdate(GridCacheContext cctx,
+        @Override public GridLongList mvccUpdate(
+            GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+            assert mvccVer != null;
+
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
@@ -1370,49 +1373,51 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert !old;
 
-                MvccLongList activeTxs = mvccVer.activeTransactions();
+                GridLongList waitTxs = null;
 
-                // TODO IGNITE-3484: need special method.
-                GridCursor<CacheDataRow> cur = dataTree.find(
-                    new MvccSearchRow(cacheId, key, 
mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
-                    new MvccSearchRow(cacheId, key, 1, 1));
+                if (!mvccVer.initialLoad()) {
+                    MvccLongList activeTxs = mvccVer.activeTransactions();
 
-                GridLongList waitTxs = null;
+                    // TODO IGNITE-3484: need special method.
+                    GridCursor<CacheDataRow> cur = dataTree.find(
+                        new MvccSearchRow(cacheId, key, 
mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+                        new MvccSearchRow(cacheId, key, 1, 1));
 
-                boolean first = true;
+                    boolean first = true;
 
-                boolean activeTx = false;
+                    boolean activeTx = false;
 
-                while (cur.next()) {
-                    CacheDataRow oldVal = cur.get();
+                    while (cur.next()) {
+                        CacheDataRow oldVal = cur.get();
 
-                    assert oldVal.link() != 0 : oldVal;
+                        assert oldVal.link() != 0 : oldVal;
 
-                    if (activeTxs != null && oldVal.mvccCoordinatorVersion() 
== mvccVer.coordinatorVersion() &&
-                        activeTxs.contains(oldVal.mvccCounter())) {
-                        if (waitTxs == null)
-                            waitTxs = new GridLongList();
+                        if (activeTxs != null && 
oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
+                            activeTxs.contains(oldVal.mvccCounter())) {
+                            if (waitTxs == null)
+                                waitTxs = new GridLongList();
 
-                        assert oldVal.mvccCounter() != mvccVer.counter();
+                            assert oldVal.mvccCounter() != mvccVer.counter();
 
-                        waitTxs.add(oldVal.mvccCounter());
+                            waitTxs.add(oldVal.mvccCounter());
 
-                        activeTx = true;
-                    }
+                            activeTx = true;
+                        }
 
-                    if (!activeTx) {
-                        // Should not delete oldest version which is less than 
cleanup version.
-                        int cmp = compare(oldVal, 
mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+                        if (!activeTx) {
+                            // Should not delete oldest version which is less 
than cleanup version.
+                            int cmp = compare(oldVal, 
mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
 
-                        if (cmp <= 0) {
-                            if (first)
-                                first = false;
-                            else {
-                                boolean rmvd = dataTree.removex(oldVal);
+                            if (cmp <= 0) {
+                                if (first)
+                                    first = false;
+                                else {
+                                    boolean rmvd = dataTree.removex(oldVal);
 
-                                assert rmvd;
+                                    assert rmvd;
 
-                                rowStore.removeRow(oldVal.link());
+                                    rowStore.removeRow(oldVal.link());
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index ac55164..5080c83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -76,6 +76,9 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     public static final long COUNTER_NA = 0L;
 
     /** */
+    public static final long START_VER = 1L;
+
+    /** */
     private static final boolean STAT_CNTRS = false;
 
     /** */
@@ -88,7 +91,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     private volatile MvccCoordinator curCrd;
 
     /** */
-    private final AtomicLong mvccCntr = new AtomicLong(1L);
+    private final AtomicLong mvccCntr = new AtomicLong(START_VER);
 
     /** */
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index eef3587..a0fd5ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -34,7 +34,7 @@ public interface MvccCoordinatorVersion extends Message {
     public long coordinatorVersion();
 
     /**
-     * @return Cleanup version.
+     * @return Cleanup version (all smaller versions are safe to remove).
      */
     public long cleanupVersion();
 
@@ -42,4 +42,9 @@ public interface MvccCoordinatorVersion extends Message {
      * @return Counter.
      */
     public long counter();
+
+    /**
+     *
+     */
+    public boolean initialLoad();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 04ef8d8..20d23ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -59,7 +59,20 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
     }
 
     /**
+     * @param crdVer Coordinator version.
      * @param cntr Counter.
+     * @param cleanupVer Cleanup version.
+     */
+    public MvccCoordinatorVersionResponse(long crdVer, long cntr, long 
cleanupVer) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+        this.cleanupVer = cleanupVer;
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     * @param cleanupVer Cleanup version.
      * @param futId Future ID.
      */
     void init(long futId, long crdVer, long cntr, long cleanupVer) {
@@ -69,6 +82,9 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
         this.cleanupVer = cleanupVer;
     }
 
+    /**
+     * @param txId Transaction counter.
+     */
     void addTx(long txId) {
         if (txs == null)
             txs = new long[4];
@@ -78,6 +94,9 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
         txs[txsCnt++] = txId;
     }
 
+    /**
+     *
+     */
     void resetTransactionsCount() {
         txsCnt = 0;
     }
@@ -140,6 +159,11 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
     }
 
     /** {@inheritDoc} */
+    @Override public boolean initialLoad() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 257a127..6ced2f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -86,6 +86,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.dr.GridDrType;
@@ -129,6 +132,14 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** Amount of permissions should be available to continue new data 
processing. */
     private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = 
Integer.MAX_VALUE;
 
+    /** Version which is less then any version generated on coordinator. */
+    private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
+        new MvccCoordinatorVersionResponse(1L, 
CacheCoordinatorsProcessor.START_VER, 0L) {
+            @Override public boolean initialLoad() {
+                return true;
+            }
+        };
+
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
 
@@ -2067,10 +2078,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
                     boolean primary = 
cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer);
 
-                    // TODO IGNITE-3478 (mvcc version).
                     entry.initialValue(e.getValue(),
                         ver,
-                        null,
+                        ISOLATED_STREAMER_MVCC_VER,
                         ttl,
                         expiryTime,
                         false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cf2aad7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 2d6afb4..f28fe2d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMode;
@@ -2354,6 +2355,37 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testLoadWithStreamer() throws Exception {
+        startGridsMultiThreaded(5);
+
+        client = true;
+
+        startGrid(5);
+
+        Ignite node = ignite(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, 
FULL_SYNC, 2, 64));
+
+        final int KEYS = 10_000;
+
+        Map<Integer, Integer> data = new HashMap<>();
+
+        try (IgniteDataStreamer<Integer, Integer> streamer = 
node.dataStreamer(cache.getName())) {
+            for (int i = 0; i < KEYS; i++) {
+                streamer.addData(i, i);
+
+                data.put(i, i);
+            }
+        }
+
+        checkCacheData(data, cache.getName());
+
+        checkPutGet(F.asList(cache.getName()));
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.

Reply via email to