Repository: ignite Updated Branches: refs/heads/ignite-5937 e2264b433 -> c28e02d02
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c28e02d0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c28e02d0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c28e02d0 Branch: refs/heads/ignite-5937 Commit: c28e02d02d28920b9c240776703d8be8701cce98 Parents: e2264b4 Author: sboikov <[email protected]> Authored: Wed Oct 18 14:21:01 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 18 14:21:01 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 73 ++++++++++++++------ .../cache/query/GridCacheQueryManager.java | 11 +-- .../processors/cache/tree/MvccCleanupRow.java | 48 +++++++++++++ .../processors/cache/tree/MvccUpdateRow.java | 20 +++--- .../processors/query/GridQueryIndexing.java | 5 +- .../processors/query/GridQueryProcessor.java | 27 ++++++-- ...IgniteClientCacheInitializationFailTest.java | 3 +- .../processors/query/h2/IgniteH2Indexing.java | 5 +- .../query/h2/opt/GridH2KeyValueRowOnheap.java | 13 ++-- .../query/h2/opt/GridH2RowDescriptor.java | 8 ++- .../processors/query/h2/opt/GridH2Table.java | 7 +- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 10 ++- 12 files changed, 171 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index e04f070..bea3ed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.DataRow; +import org.apache.ignite.internal.processors.cache.tree.MvccCleanupRow; import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow; @@ -1531,6 +1532,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) { assert !primary : updateRow; + + cleanup(cctx, updateRow.cleanupRows(), false); + + return null; } else { rowStore.addRow(updateRow); @@ -1543,7 +1548,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager incrementSize(cctx.cacheId()); } - cleanup(updateRow.cleanupRows(), false); + cleanup(cctx, updateRow.cleanupRows(), false); CacheDataRow oldRow = updateRow.oldRow(); @@ -1553,7 +1558,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.store(updateRow, oldRow); + qryMgr.store(updateRow, mvccVer, oldRow); return updateRow.activeTransactions(); } @@ -1600,18 +1605,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) { assert !primary : updateRow; - cleanup(updateRow.cleanupRows(), false); + cleanup(cctx, updateRow.cleanupRows(), false); + + return null; } else { if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL) decrementSize(cacheId); - CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true); + long rmvRowLink = cleanup(cctx, updateRow.cleanupRows(), true); - if (rmvRow == null) + if (rmvRowLink == 0) rowStore.addRow(updateRow); else - updateRow.link(rmvRow.link()); + updateRow.link(rmvRowLink); assert updateRow.link() != 0L; @@ -1620,6 +1627,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; } + CacheDataRow oldRow = updateRow.oldRow(); + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + oldRow.key(key); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (qryMgr.enabled()) + qryMgr.remove(key, oldRow, mvccVer); + } + return updateRow.activeTransactions(); } finally { @@ -1663,34 +1683,45 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** * @param cleanupRows Rows to cleanup. * @param findRmv {@code True} if need keep removed row entry. - * @return Removed row entry if found. + * @return Removed row link of {@code 0} if not found. * @throws IgniteCheckedException If failed. */ - @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv) + private long cleanup(GridCacheContext cctx, @Nullable List<MvccCleanupRow> cleanupRows, boolean findRmv) throws IgniteCheckedException { - CacheSearchRow rmvRow = null; + long rmvRowLink = 0; if (cleanupRows != null) { + GridCacheQueryManager qryMgr = cctx.queries(); + for (int i = 0; i < cleanupRows.size(); i++) { - CacheSearchRow oldRow = cleanupRows.get(i); + MvccCleanupRow cleanupRow = cleanupRows.get(i); - assert oldRow.link() != 0L : oldRow; + assert cleanupRow.link() != 0 : cleanupRow; - boolean rmvd = dataTree.removex(oldRow); + if (qryMgr.enabled() && !versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) { + CacheDataRow oldRow = dataTree.remove(cleanupRow); - assert rmvd; + assert oldRow != null : cleanupRow; + + qryMgr.remove(oldRow.key(), oldRow, null); + } + else { + boolean rmvd = dataTree.removex(cleanupRow); + + assert rmvd; + } if (findRmv && - rmvRow == null && - versionForRemovedValue(oldRow.mvccCoordinatorVersion())) { - rmvRow = oldRow; + rmvRowLink == 0 && + versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) { + rmvRowLink = cleanupRow.link(); } else - rowStore.removeRow(oldRow.link()); + rowStore.removeRow(cleanupRow.link()); } } - return rmvRow; + return rmvRowLink; } /** {@inheritDoc} */ @@ -1770,7 +1801,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; if (qryMgr.enabled()) - qryMgr.store(newRow, oldRow); + qryMgr.store(newRow, null, oldRow); if (oldRow != null) { assert oldRow.link() != 0 : oldRow; @@ -1802,7 +1833,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); - qryMgr.store(row, null); + qryMgr.store(row, null, null); // TODO IGNITE-3478. } } @@ -1846,7 +1877,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.remove(key, oldRow); + qryMgr.remove(key, oldRow, null); if (oldRow != null) rowStore.removeRow(oldRow.link()); http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 59b7613..fb5728a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -382,10 +382,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param newRow New row. + * @param mvccVer Mvcc version for update. * @param prevRow Previous row. * @throws IgniteCheckedException In case of error. */ - public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow) + public void store(CacheDataRow newRow, @Nullable MvccCoordinatorVersion mvccVer, @Nullable CacheDataRow prevRow) throws IgniteCheckedException { assert enabled(); assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow; @@ -405,7 +406,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (qryProcEnabled) - qryProc.store(cctx, newRow, prevRow); + qryProc.store(cctx, newRow, mvccVer, prevRow); } finally { invalidateResultCache(); @@ -417,9 +418,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param key Key. * @param prevRow Previous row. + * @param newVer Mvcc version for remove operation. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) throws IgniteCheckedException { + public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow, @Nullable MvccCoordinatorVersion newVer) + throws IgniteCheckedException { if (!QueryUtils.isEnabled(cctx.config())) return; // No-op. @@ -435,7 +438,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // val may be null if we have no previous value. We should not call processor in this case. if (qryProcEnabled && prevRow != null) - qryProc.remove(cctx, prevRow); + qryProc.remove(cctx, prevRow, newVer); } finally { invalidateResultCache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java new file mode 100644 index 0000000..3093e0d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; + +/** + * + */ +public class MvccCleanupRow extends MvccSearchRow { + /** */ + private final long link; + + /** + * @param cacheId + * @param key + * @param crdVer + * @param mvccCntr + * @param link + */ + MvccCleanupRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr, long link) { + super(cacheId, key, crdVer, mvccCntr); + + assert link != 0L; + + this.link = link; + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java index fb2a6cf..90de16f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; @@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue; @@ -51,7 +51,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C private GridLongList activeTxs; /** */ - private List<CacheSearchRow> cleanupRows; + private List<MvccCleanupRow> cleanupRows; /** */ private final MvccCoordinatorVersion mvccVer; @@ -109,7 +109,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C /** * @return Rows which are safe to cleanup. */ - public List<CacheSearchRow> cleanupRows() { + public List<MvccCleanupRow> cleanupRows() { return cleanupRows; } @@ -175,6 +175,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C if (needOld) oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); } + res = versionForRemovedValue(rowCrdVerMasked) ? UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL; } @@ -199,26 +200,25 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C int cmp; + long rowCntr = rowIo.getMvccCounter(pageAddr, idx); + if (crdVer == rowCrdVer) - cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx)); + cmp = Long.compare(mvccVer.cleanupVersion(), rowCntr); else cmp = 1; if (cmp >= 0) { // Do not cleanup oldest version. if (canCleanup) { - CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx); - - assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row; + assert assertMvccVersionValid(rowCrdVer, rowCntr); // Should not be possible to cleanup active tx. - assert rowCrdVer != crdVer - || !mvccVer.activeTransactions().contains(row.mvccCounter()); + assert rowCrdVer != crdVer || !mvccVer.activeTransactions().contains(rowCntr); if (cleanupRows == null) cleanupRows = new ArrayList<>(); - cleanupRows.add(row); + cleanupRows.add(new MvccCleanupRow(cacheId, key, rowCrdVerMasked, rowCntr, rowIo.getLink(pageAddr, idx))); } else canCleanup = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 265b9bc..5bd4bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -31,6 +31,7 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -217,13 +218,13 @@ public interface GridQueryIndexing { * @param cctx Cache context. * @param type Type descriptor. * @param row New row. - * @param mvccNewRow New inserted mvcc row for the same key. + * @param newVer Version of new mvcc value inserted for the same key. * @throws IgniteCheckedException If failed. */ public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, - @Nullable CacheDataRow mvccNewRow) throws IgniteCheckedException; + @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException; /** * Removes index entry by key. http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index d132539..74157d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -1696,14 +1697,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. * @param newRow New row. + * @param mvccVer Mvcc version for update. * @param prevRow Previous row. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow prevRow) - throws IgniteCheckedException { + public void store(GridCacheContext cctx, + CacheDataRow newRow, + @Nullable MvccCoordinatorVersion mvccVer, + @Nullable CacheDataRow prevRow) throws IgniteCheckedException + { assert cctx != null; assert newRow != null; + assert !cctx.mvccEnabled() || mvccVer != null; KeyCacheObject key = newRow.key(); @@ -1745,7 +1751,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.store(cctx, desc, newRow, null); if (prevRow != null) - idx.store(cctx, desc, prevRow, newRow); + idx.store(cctx, desc, prevRow, mvccVer); } else idx.store(cctx, desc, newRow, null); @@ -2311,12 +2317,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. - * @param val Row. + * @param val Value removed from cache. + * @param newVer Mvcc version for remove operation. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(GridCacheContext cctx, CacheDataRow val) + public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { assert val != null; + assert !cctx.mvccEnabled() || newVer == null; if (log.isDebugEnabled()) log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]"); @@ -2337,7 +2345,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (desc == null) return; - idx.remove(cctx, desc, val); + if (cctx.mvccEnabled()) { + if (newVer != null) + idx.store(cctx, desc, val, newVer); // Set info about more recent version for previous record. + else + idx.remove(cctx, desc, val); + } + else + idx.remove(cctx, desc, val); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index ac12407..d77fb81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryIndexing; @@ -311,7 +312,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, - @Nullable CacheDataRow mvccNewRow) throws IgniteCheckedException { + @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index f4bc1f2..43700e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo; @@ -539,7 +540,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, @Nullable CacheDataRow mvccNewRow) + @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { String cacheName = cctx.name(); @@ -548,7 +549,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (tbl == null) return; // Type was rejected. - tbl.table().update(row, mvccNewRow, false); + tbl.table().update(row, newVer, false); if (tbl.luceneIndex() != null) { long expireTime = row.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java index 54b84da..ba12dd2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -58,21 +59,21 @@ public class GridH2KeyValueRowOnheap extends GridH2Row { private Value ver; /** */ - private final CacheDataRow mvccNewRow; + private final MvccCoordinatorVersion newVer; /** * Constructor. * * @param desc Row descriptor. * @param row Row. - * @param mvccNewRow New inserted mvcc row for the same key. + * @param newVer Version of new mvcc value inserted for the same key. * @param keyType Key type. * @param valType Value type. * @throws IgniteCheckedException If failed. */ public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, CacheDataRow row, - CacheDataRow mvccNewRow, + MvccCoordinatorVersion newVer, int keyType, int valType) throws IgniteCheckedException { super(row); @@ -87,17 +88,17 @@ public class GridH2KeyValueRowOnheap extends GridH2Row { if (row.version() != null) this.ver = desc.wrap(row.version(), Value.JAVA_OBJECT); - this.mvccNewRow = mvccNewRow; + this.newVer = newVer; } /** {@inheritDoc} */ @Override public long newMvccCoordinatorVersion() { - return mvccNewRow != null ? mvccNewRow.mvccCoordinatorVersion() : 0; + return newVer != null ? newVer.coordinatorVersion() : 0; } /** {@inheritDoc} */ @Override public long newMvccCounter() { - return mvccNewRow != null ? mvccNewRow.mvccCounter() : CacheCoordinatorsProcessor.COUNTER_NA; + return newVer != null ? newVer.counter(): CacheCoordinatorsProcessor.COUNTER_NA; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index b42e2d8..ad91deb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -274,20 +275,21 @@ public class GridH2RowDescriptor { * Creates new row. * * @param dataRow Data row. + * @param newVer Version of new mvcc value inserted for the same key. * @return Row. * @throws IgniteCheckedException If failed. */ - public GridH2Row createRow(CacheDataRow dataRow, @Nullable CacheDataRow mvccNewRow) throws IgniteCheckedException { + public GridH2Row createRow(CacheDataRow dataRow, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { GridH2Row row; try { if (dataRow.value() == null) { // Only can happen for remove operation, can create simple search row. - assert mvccNewRow == null; + assert newVer == null; row = new GridH2KeyRowOnheap(dataRow, wrap(dataRow.key(), keyType)); } else - row = new GridH2KeyValueRowOnheap(this, dataRow, mvccNewRow, keyType, valType); + row = new GridH2KeyValueRowOnheap(this, dataRow, newVer, keyType, valType); } catch (ClassCastException e) { throw new IgniteCheckedException("Failed to convert key to SQL type. " + http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 81a7ed2..891dc32 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.IgniteSQLException; @@ -389,16 +390,16 @@ public class GridH2Table extends TableBase { * otherwise value and expiration time will be updated or new row will be added. * * @param row Row. - * @param mvccNewRow New inserted mvcc row for the same key. + * @param newVer Version of new mvcc value inserted for the same key. * @param rmv If {@code true} then remove, else update row. * @return {@code true} If operation succeeded. * @throws IgniteCheckedException If failed. */ - public boolean update(CacheDataRow row, @Nullable CacheDataRow mvccNewRow, boolean rmv) + public boolean update(CacheDataRow row, @Nullable MvccCoordinatorVersion newVer, boolean rmv) throws IgniteCheckedException { assert desc != null; - GridH2Row h2Row = desc.createRow(row, mvccNewRow); + GridH2Row h2Row = desc.createRow(row, newVer); if (rmv) return doUpdate(h2Row, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/c28e02d0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index 01c752d..66ae606 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -68,9 +68,17 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { SqlQuery<Integer, MvccTestSqlIndexValue> qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = 1"); - List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll(); + List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res; + + res = cache.query(qry).getAll(); assertEquals(1, res.size()); + + cache.remove(1); + + res = cache.query(qry).getAll(); + + assertEquals(0, res.size()); } /**
