Repository: ignite Updated Branches: refs/heads/ignite-6149 7441fe30c -> 03eec6043
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03eec604 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03eec604 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03eec604 Branch: refs/heads/ignite-6149 Commit: 03eec60439f1c982a6eeadd786ff8075fc591f1d Parents: 7441fe3 Author: sboikov <[email protected]> Authored: Fri Sep 15 16:08:08 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 15 16:08:08 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 7 +-- .../mvcc/CacheCoordinatorsSharedManager.java | 30 +++++------ .../cache/mvcc/MvccCoordinatorVersion.java | 3 +- .../mvcc/MvccCoordinatorVersionResponse.java | 53 ++++++++++++++++---- .../processors/cache/mvcc/MvccLongList.java | 29 +++++++++++ 5 files changed, 91 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 7f3d3a7..ea74f3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; @@ -1369,7 +1370,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; - GridLongList activeTxs = mvccVer.activeTransactions(); + MvccLongList activeTxs = mvccVer.activeTransactions(); // TODO IGNITE-3484: need special method. GridCursor<CacheDataRow> cur = dataTree.find( @@ -1658,7 +1659,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow row = null; - GridLongList txs = ver.activeTransactions(); + MvccLongList txs = ver.activeTransactions(); while (cur.next()) { CacheDataRow row0 = cur.get(); @@ -1728,7 +1729,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager || row.mvccCounter() > ver.counter()) continue; - GridLongList txs = ver.activeTransactions(); + MvccLongList txs = ver.activeTransactions(); if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter())) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 8b70d3e..0d3029a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -326,7 +326,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); if (STAT_CNTRS) - statCntrs[0].update(res.activeTransactions()); + statCntrs[0].update(res.size()); try { cctx.gridIO().sendToGridTopic(node, @@ -468,14 +468,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager long nextCtr = mvccCntr.incrementAndGet(); // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? - GridLongList txs = null; + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.values()) { - if (txs == null) - txs = new GridLongList(); - - txs.add(txVer); - } + for (Long txVer : activeTxs.values()) + res.addTx(txVer); Object old = activeTxs.put(txId, nextCtr); @@ -488,7 +484,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager cleanupVer = qryVer - 1; } - return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, cleanupVer); + res.init(futId, crdVer, nextCtr, cleanupVer); + + return res; } /** @@ -511,14 +509,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager Long mvccCntr = committedCntr.get(); - GridLongList txs = null; + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.values()) { - if (txs == null) - txs = new GridLongList(); - - txs.add(txVer); - } + for (Long txVer : activeTxs.values()) + res.addTx(txVer); Integer queries = activeQueries.get(mvccCntr); @@ -527,7 +521,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager else activeQueries.put(mvccCntr, 1); - return new MvccCoordinatorVersionResponse(futId, crdVer, mvccCntr, txs, COUNTER_NA); + res.init(futId, crdVer, mvccCntr, COUNTER_NA); + + return res; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java index eb0768d..eef3587 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.plugin.extensions.communication.Message; /** @@ -27,7 +26,7 @@ public interface MvccCoordinatorVersion extends Message { /** * @return Active transactions. */ - public GridLongList activeTransactions(); + public MvccLongList activeTransactions(); /** * @return Coordinator version. http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java index 07f8cf3..e218945 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java @@ -18,8 +18,9 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -27,7 +28,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, MvccCoordinatorVersion { +public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, MvccCoordinatorVersion, MvccLongList { /** */ private static final long serialVersionUID = 0L; @@ -41,7 +42,11 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M private long cntr; /** */ - private GridLongList txs; // TODO IGNITE-3478 (do not send on backups?) + @GridDirectTransient + private int txsCnt; + + /** */ + private long[] txs; // TODO IGNITE-3478 (do not send on backups?) /** */ private long cleanupVer; @@ -57,14 +62,42 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M * @param cntr Counter. * @param futId Future ID. */ - public MvccCoordinatorVersionResponse(long futId, long crdVer, long cntr, GridLongList txs, long cleanupVer) { + void init(long futId, long crdVer, long cntr, long cleanupVer) { this.futId = futId; this.crdVer = crdVer; this.cntr = cntr; - this.txs = txs; this.cleanupVer = cleanupVer; } + void addTx(long txId) { + if (txs == null) + txs = new long[4]; + else if (txs.length == txsCnt) + txs = Arrays.copyOf(txs, txs.length << 1); + + txs[txsCnt++] = txId; + } + + @Override + public int size() { + return txsCnt; + } + + @Override + public long get(int i) { + return txs[i]; + } + + @Override + public boolean contains(long val) { + for (int i = 0; i < txsCnt; i++) { + if (txs[i] == val) + return true; + } + + return false; + } + /** {@inheritDoc} */ @Override public boolean waitForCoordinatorInit() { return false; @@ -93,8 +126,8 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M } /** {@inheritDoc} */ - @Override public GridLongList activeTransactions() { - return txs; + @Override public MvccLongList activeTransactions() { + return this; } /** {@inheritDoc} */ @@ -139,7 +172,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M writer.incrementState(); case 4: - if (!writer.writeMessage("txs", txs)) + if (!writer.writeLongArray("txs", txs)) return false; writer.incrementState(); @@ -190,11 +223,13 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M reader.incrementState(); case 4: - txs = reader.readMessage("txs"); + txs = reader.readLongArray("txs"); if (!reader.isLastRead()) return false; + txsCnt = txs != null ? txs.length : 0; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/03eec604/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java new file mode 100644 index 0000000..8b580ed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccLongList.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +/** + * + */ +public interface MvccLongList { + public int size(); + + public long get(int i); + + public boolean contains(long val); +}
