IGNITE-8900 SqlFieldsQuery provides incorrect result when item size exceeds page size.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1ecbbc8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1ecbbc8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1ecbbc8 Branch: refs/heads/ignite-8446 Commit: f1ecbbc87d6a42dad960cda09cbc43992ff286e1 Parents: 458480c Author: ilantukh <[email protected]> Authored: Wed Aug 1 16:31:15 2018 +0300 Committer: Dmitriy Pavlov <[email protected]> Committed: Wed Aug 1 16:31:15 2018 +0300 ---------------------------------------------------------------------- .../delta/DataPageInsertFragmentRecord.java | 2 +- .../cache/IgniteCacheOffheapManagerImpl.java | 3 + .../persistence/freelist/AbstractFreeList.java | 2 +- .../persistence/freelist/CacheFreeListImpl.java | 11 ++ .../persistence/tree/io/AbstractDataPageIO.java | 23 ++- .../processors/cache/BigEntryQueryTest.java | 150 +++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite.java | 2 + 7 files changed, 177 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java index 5324d56..2b02bb57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java @@ -57,7 +57,7 @@ public class DataPageInsertFragmentRecord extends PageDeltaRecord { @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { AbstractDataPageIO io = PageIO.getPageIO(pageAddr); - io.addRowFragment(pageAddr, payload, lastLink, pageMem.pageSize()); + io.addRowFragment(PageIO.getPageId(pageAddr), pageAddr, payload, lastLink, pageMem.pageSize()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 10281d6..5482b3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1386,6 +1386,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert oldRow == null || oldRow.cacheId() == cacheId : oldRow; + if (key.partition() == -1) + key.partition(partId); + DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId); CacheObjectContext coCtx = cctx.cacheObjectContext(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index c1a48bb..bcedd8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -233,7 +233,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp // Read last link before the fragment write, because it will be updated there. long lastLink = row.link(); - int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize()); + int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize()); assert payloadSize > 0 : payloadSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java index dc0c92e..4aa9d88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractD import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; +import org.apache.ignite.internal.util.typedef.internal.U; /** * FreeList implementation for cache. @@ -53,6 +55,15 @@ public class CacheFreeListImpl extends AbstractFreeList<CacheDataRow> { } /** {@inheritDoc} */ + @Override public void insertDataRow(CacheDataRow row) throws IgniteCheckedException { + super.insertDataRow(row); + + assert row.key().partition() == PageIdUtils.partId(row.link()) : + "Constructed a link with invalid partition ID [partId=" + row.key().partition() + + ", link=" + U.hexLong(row.link()) + ']'; + } + + /** {@inheritDoc} */ @Override public String toString() { return "FreeList [name=" + name + ']'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java index ca63f27..50b5779 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java @@ -907,6 +907,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { * Adds maximum possible fragment of the given row to this data page and sets respective link to the row. * * @param pageMem Page memory. + * @param pageId Page ID to use to construct a link. * @param pageAddr Page address. * @param row Data row. * @param written Number of bytes of row size that was already written. @@ -917,18 +918,20 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { */ public int addRowFragment( PageMemory pageMem, + long pageId, long pageAddr, T row, int written, int rowSize, int pageSize ) throws IgniteCheckedException { - return addRowFragment(pageMem, pageAddr, written, rowSize, row.link(), row, null, pageSize); + return addRowFragment(pageMem, pageId, pageAddr, written, rowSize, row.link(), row, null, pageSize); } /** * Adds this payload as a fragment to this data page. * + * @param pageId Page ID to use to construct a link. * @param pageAddr Page address. * @param payload Payload bytes. * @param lastLink Link to the previous written fragment (link to the tail). @@ -936,18 +939,20 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { * @throws IgniteCheckedException If failed. */ public void addRowFragment( + long pageId, long pageAddr, byte[] payload, long lastLink, int pageSize ) throws IgniteCheckedException { - addRowFragment(null, pageAddr, 0, 0, lastLink, null, payload, pageSize); + addRowFragment(null, pageId, pageAddr, 0, 0, lastLink, null, payload, pageSize); } /** * Adds maximum possible fragment of the given row to this data page and sets respective link to the row. * * @param pageMem Page memory. + * @param pageId Page ID to use to construct a link. * @param pageAddr Page address. * @param written Number of bytes of row size that was already written. * @param rowSize Row size. @@ -960,6 +965,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { */ private int addRowFragment( PageMemory pageMem, + long pageId, long pageAddr, int written, int rowSize, @@ -1004,24 +1010,13 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { int itemId = addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); if (row != null) - setLink(row, pageAddr, itemId); + setLinkByPageId(row, pageId, itemId); return payloadSize; } /** * @param row Row to set link to. - * @param pageAddr Page address. - * @param itemId Item ID. - */ - private void setLink(T row, long pageAddr, int itemId) { - long pageId = getPageId(pageAddr); - - setLinkByPageId(row, pageId, itemId); - } - - /** - * @param row Row to set link to. * @param pageId Page ID. * @param itemId Item ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BigEntryQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BigEntryQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BigEntryQueryTest.java new file mode 100644 index 0000000..3b5562a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BigEntryQueryTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * This is a specific test for IGNITE-8900. + */ +public class BigEntryQueryTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE = "cache"; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60_000; + } + + /** + * @throws Exception if failed. + */ + public void testBigEntriesSelect() throws Exception { + startGrids(2); + + Random random = new Random(1); + + Ignition.setClientMode(true); + + Ignite client = startGrid(2); + + int ctr = 0; + + long time0 = System.currentTimeMillis(); + + while ((System.currentTimeMillis() - time0) < 30_000) { + String cacheName = CACHE + ctr++; + + IgniteCache<Long, Value> cache = client.getOrCreateCache(new CacheConfiguration<Long, Value>(cacheName) + .setCacheMode(CacheMode.PARTITIONED) + .setBackups(1) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setIndexedTypes(Long.class, Value.class)); + + cache.putAll(LongStream.range(610026643276160000L, 610026643276170000L).boxed() + .collect(Collectors.toMap(Function.identity(), + t -> Value.of(new byte[(random.nextInt(16)) * 1000])))); + + for (int i = 0; i < 10; i++) { + long start = 610026643276160000L; + long end = start + random.nextInt(10); + + int expectedResultCount = (int)(end - start + 1); + + String sql = String.format( + "SELECT _KEY " + + "FROM %s " + + "WHERE _KEY >= %d AND _KEY <= %d", Value.class.getSimpleName().toLowerCase(), + start, end); + + Set<Long> keySet = new HashSet<>(); + for (long l = start; l < end + 1; l++) + keySet.add(l); + + List<Long> resultKeys; + try (FieldsQueryCursor<List<?>> results = cache.query(new SqlFieldsQuery(sql))) { + resultKeys = new ArrayList<>(); + results.forEach(objects -> resultKeys.add((Long)objects.get(0))); + Collections.sort(resultKeys); + } + + assertEquals(expectedResultCount, resultKeys.size()); + } + cache.destroy(); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); + + DataStorageConfiguration sCfg = new DataStorageConfiguration(); + + sCfg.setPageSize(1024 * 8); + + cfg.setDataStorageConfiguration(sCfg); + + return cfg; + } + + /** + * Class containing value to be placed into the cache. + */ + public static class Value implements Serializable { + /** */ + final byte[] data; + + /** + * @param data Data. + */ + public Value(final byte[] data) { + this.data = data; + } + + /** + * @param bytes Bytes. + * @return Value. + */ + public static Value of(final byte[] bytes) { + return new Value(bytes); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1ecbbc8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index 8164fe0..b44ff2d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.BinarySerializationQuerySelfT import org.apache.ignite.internal.processors.cache.BinarySerializationQueryWithReflectiveSerializerSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanWithEventsSelfTest; +import org.apache.ignite.internal.processors.cache.BigEntryQueryTest; /** * Cache query suite with binary marshaller. @@ -39,6 +40,7 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(BinarySerializationQueryWithReflectiveSerializerSelfTest.class); suite.addTestSuite(IgniteCacheBinaryObjectsScanSelfTest.class); suite.addTestSuite(IgniteCacheBinaryObjectsScanWithEventsSelfTest.class); + suite.addTestSuite(BigEntryQueryTest.class); //Should be adjusted. Not ready to be used with BinaryMarshaller. //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);
