Repository: ignite
Updated Branches:
  refs/heads/ignite-6149 7441fe30c -> 03eec6043


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03eec604
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03eec604
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03eec604

Branch: refs/heads/ignite-6149
Commit: 03eec60439f1c982a6eeadd786ff8075fc591f1d
Parents: 7441fe3
Author: sboikov <[email protected]>
Authored: Fri Sep 15 16:08:08 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 15 16:08:08 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  7 +--
 .../mvcc/CacheCoordinatorsSharedManager.java    | 30 +++++------
 .../cache/mvcc/MvccCoordinatorVersion.java      |  3 +-
 .../mvcc/MvccCoordinatorVersionResponse.java    | 53 ++++++++++++++++----
 .../processors/cache/mvcc/MvccLongList.java     | 29 +++++++++++
 5 files changed, 91 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 7f3d3a7..ea74f3c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -1369,7 +1370,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert !old;
 
-                GridLongList activeTxs = mvccVer.activeTransactions();
+                MvccLongList activeTxs = mvccVer.activeTransactions();
 
                 // TODO IGNITE-3484: need special method.
                 GridCursor<CacheDataRow> cur = dataTree.find(
@@ -1658,7 +1659,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
             CacheDataRow row = null;
 
-            GridLongList txs = ver.activeTransactions();
+            MvccLongList txs = ver.activeTransactions();
 
             while (cur.next()) {
                 CacheDataRow row0 = cur.get();
@@ -1728,7 +1729,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                             || row.mvccCounter() > ver.counter())
                             continue;
 
-                        GridLongList txs = ver.activeTransactions();
+                        MvccLongList txs = ver.activeTransactions();
 
                         if (txs != null && row.mvccCoordinatorVersion() == 
ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 8b70d3e..0d3029a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -326,7 +326,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), 
msg.futureId());
 
         if (STAT_CNTRS)
-            statCntrs[0].update(res.activeTransactions());
+            statCntrs[0].update(res.size());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
@@ -468,14 +468,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         long nextCtr = mvccCntr.incrementAndGet();
 
         // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
-        GridLongList txs = null;
+        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.values()) {
-            if (txs == null)
-                txs = new GridLongList();
-
-            txs.add(txVer);
-        }
+        for (Long txVer : activeTxs.values())
+            res.addTx(txVer);
 
         Object old = activeTxs.put(txId, nextCtr);
 
@@ -488,7 +484,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
                 cleanupVer = qryVer - 1;
         }
 
-        return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, 
cleanupVer);
+        res.init(futId, crdVer, nextCtr, cleanupVer);
+
+        return res;
     }
 
     /**
@@ -511,14 +509,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         Long mvccCntr = committedCntr.get();
 
-        GridLongList txs = null;
+        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.values()) {
-            if (txs == null)
-                txs = new GridLongList();
-
-            txs.add(txVer);
-        }
+        for (Long txVer : activeTxs.values())
+            res.addTx(txVer);
 
         Integer queries = activeQueries.get(mvccCntr);
 
@@ -527,7 +521,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         else
             activeQueries.put(mvccCntr, 1);
 
-        return new MvccCoordinatorVersionResponse(futId, crdVer, mvccCntr, 
txs, COUNTER_NA);
+        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index eb0768d..eef3587 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
@@ -27,7 +26,7 @@ public interface MvccCoordinatorVersion extends Message {
     /**
      * @return Active transactions.
      */
-    public GridLongList activeTransactions();
+    public MvccLongList activeTransactions();
 
     /**
      * @return Coordinator version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 07f8cf3..e218945 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -18,8 +18,9 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +28,7 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, 
MvccCoordinatorVersion {
+public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, 
MvccCoordinatorVersion, MvccLongList {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -41,7 +42,11 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
     private long cntr;
 
     /** */
-    private GridLongList txs; // TODO IGNITE-3478 (do not send on backups?)
+    @GridDirectTransient
+    private int txsCnt;
+
+    /** */
+    private long[] txs; // TODO IGNITE-3478 (do not send on backups?)
 
     /** */
     private long cleanupVer;
@@ -57,14 +62,42 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
      * @param cntr Counter.
      * @param futId Future ID.
      */
-    public MvccCoordinatorVersionResponse(long futId, long crdVer, long cntr, 
GridLongList txs, long cleanupVer) {
+    void init(long futId, long crdVer, long cntr, long cleanupVer) {
         this.futId = futId;
         this.crdVer = crdVer;
         this.cntr = cntr;
-        this.txs = txs;
         this.cleanupVer = cleanupVer;
     }
 
+    void addTx(long txId) {
+        if (txs == null)
+            txs = new long[4];
+        else if (txs.length == txsCnt)
+            txs = Arrays.copyOf(txs, txs.length << 1);
+
+        txs[txsCnt++] = txId;
+    }
+
+    @Override
+    public int size() {
+        return txsCnt;
+    }
+
+    @Override
+    public long get(int i) {
+        return txs[i];
+    }
+
+    @Override
+    public boolean contains(long val) {
+        for (int i = 0; i < txsCnt; i++) {
+            if (txs[i] == val)
+                return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean waitForCoordinatorInit() {
         return false;
@@ -93,8 +126,8 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
     }
 
     /** {@inheritDoc} */
-    @Override public GridLongList activeTransactions() {
-        return txs;
+    @Override public MvccLongList activeTransactions() {
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -139,7 +172,7 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("txs", txs))
+                if (!writer.writeLongArray("txs", txs))
                     return false;
 
                 writer.incrementState();
@@ -190,11 +223,13 @@ public class MvccCoordinatorVersionResponse implements 
MvccCoordinatorMessage, M
                 reader.incrementState();
 
             case 4:
-                txs = reader.readMessage("txs");
+                txs = reader.readLongArray("txs");
 
                 if (!reader.isLastRead())
                     return false;
 
+                txsCnt = txs != null ? txs.length : 0;
+
                 reader.incrementState();
 
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java
new file mode 100644
index 0000000..8b580ed
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public interface MvccLongList {
+    public int size();
+
+    public long get(int i);
+
+    public boolean contains(long val);
+}

Reply via email to