IGNITE-10393: MVCC: Fixed streamer with persistence on. This closes #5497.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c63a60a3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c63a60a3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c63a60a3 Branch: refs/heads/ignite-9720 Commit: c63a60a39d4131861c98a84440ecb8c67b10ba25 Parents: 25c41fa Author: Igor Seliverstov <[email protected]> Authored: Tue Nov 27 10:31:01 2018 +0300 Committer: devozerov <[email protected]> Committed: Tue Nov 27 10:31:01 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 2 +- .../persistence/pagemem/PageMemoryImpl.java | 2 +- ...aStreamProcessorMvccPersistenceSelfTest.java | 28 +++++++++ .../DataStreamProcessorPersistenceSelfTest.java | 28 +++++++++ .../DataStreamProcessorSelfTest.java | 63 +++++++++++++++++++- .../testsuites/IgniteBinaryCacheTestSuite.java | 2 + .../testsuites/IgniteCacheMvccTestSuite.java | 2 + .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 8 files changed, 126 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5f4f974..bbdff35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3478,7 +3478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme expireTime, partition(), updateCntr, - mvccVer + mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer ))); } else { cctx.shared().wal().log(new DataRecord(new DataEntry( http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index f6aa059..c4b0104 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; @@ -65,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java new file mode 100644 index 0000000..9360cab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +/** + * + */ +public class DataStreamProcessorMvccPersistenceSelfTest extends DataStreamProcessorMvccSelfTest { + /** {@inheritDoc} */ + @Override public boolean persistenceEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java new file mode 100644 index 0000000..7ce4fdd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +/** + * + */ +public class DataStreamProcessorPersistenceSelfTest extends DataStreamProcessorSelfTest { + /** {@inheritDoc} */ + @Override public boolean persistenceEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index 877df2e..39f43e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -42,9 +42,12 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.IgniteReflectionFactory; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; @@ -52,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; @@ -97,6 +101,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { /** */ private TestStore store; + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + if (persistenceEnabled()) + cleanPersistenceDir(); + } + /** {@inheritDoc} */ @Override public void afterTest() throws Exception { super.afterTest(); @@ -104,6 +115,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { useCache = false; } + /** + * @return {@code True} if persistent store is enabled for test. + */ + public boolean persistenceEnabled() { + return false; + } + /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -141,6 +159,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } cfg.setCacheConfiguration(cc); + + if (persistenceEnabled()) + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY)); } else { cfg.setCacheConfiguration(); @@ -225,6 +249,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { Ignite igniteWithoutCache = startGrid(1); + afterGridStarted(); + final IgniteDataStreamer<Integer, Integer> ldr = igniteWithoutCache.dataStreamer(DEFAULT_CACHE_NAME); ldr.receiver(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); @@ -337,7 +363,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { startGrid(1); startGrid(2); - awaitPartitionMapExchange(); + afterGridStarted(); IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME); @@ -422,6 +448,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { Ignite g1 = startGrid(1); startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + afterGridStarted(); + List<Object> arrays = Arrays.<Object>asList( new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); @@ -485,6 +513,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { Ignite g1 = grid(idx - 1); + afterGridStarted(); + // Get and configure loader. final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME); @@ -589,6 +619,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { try { Ignite g1 = startGrid(1); + afterGridStarted(); + IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME); ldr.close(false); @@ -746,6 +778,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { try { Ignite g = startGrid(); + afterGridStarted(); + final IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME); final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME); @@ -799,6 +833,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { try { Ignite g = startGrid(); + afterGridStarted(); + IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME); IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME); @@ -835,6 +871,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { try { Ignite g = startGrid(); + afterGridStarted(); + final CountDownLatch latch = new CountDownLatch(9); g.events().localListen(new IgnitePredicate<Event>() { @@ -891,6 +929,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { startGrid(2); startGrid(3); + afterGridStarted(); + for (int i = 0; i < 1000; i++) storeMap.put(i, i); @@ -940,6 +980,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } finally { storeMap = null; + + stopAllGrids(); } } @@ -955,6 +997,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { startGrid(2); startGrid(3); + afterGridStarted(); + try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { ldr.allowOverwrite(true); ldr.keepBinary(customKeepBinary()); @@ -988,6 +1032,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { Ignite ignite = startGrid(1); + afterGridStarted(); + final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { @@ -1034,6 +1080,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { Ignite client = startGrid(0); + afterGridStarted(); + final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) { @@ -1100,6 +1148,19 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } /** + * Activates grid if necessary and wait for partition map exchange. + */ + private void afterGridStarted() throws InterruptedException { + G.allGrids().stream() + .filter(g -> !g.cluster().node().isClient()) + .findAny() + .filter(g -> !g.cluster().active()) + .ifPresent(g -> g.cluster().active(true)); + + awaitPartitionMapExchange(); + } + + /** * */ @SuppressWarnings("PublicInnerClass") http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java index 170bb33..5cfe534 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCa import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheAtomicPartitionedOnlyBinaryMultithreadedSelfTest; import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesNearPartitionedByteArrayValuesSelfTest; import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesPartitionedOnlyByteArrayValuesSelfTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest; /** @@ -51,6 +52,7 @@ public class IgniteBinaryCacheTestSuite extends TestSuite { // Tests below have a special version for Binary Marshaller ignoredTests.add(DataStreamProcessorSelfTest.class); + ignoredTests.add(DataStreamProcessorPersistenceSelfTest.class); ignoredTests.add(GridCacheAffinityRoutingSelfTest.class); ignoredTests.add(IgniteCacheAtomicLocalExpiryPolicyTest.class); ignoredTests.add(GridCacheEntryMemorySizeSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index d4b837c..930706d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; @@ -59,6 +60,7 @@ public class IgniteCacheMvccTestSuite extends TestSuite { suite.addTestSuite(CacheMvccConfigurationValidationTest.class); suite.addTestSuite(DataStreamProcessorMvccSelfTest.class); + suite.addTestSuite(DataStreamProcessorMvccPersistenceSelfTest.class); suite.addTestSuite(CacheMvccOperationChecksTest.class); suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 52e2ba2..dd03ef3 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -144,6 +144,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerClientReconnectAfterClusterRestartTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest; @@ -250,6 +251,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheBalancingStoreSelfTest.class); suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); + GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorPersistenceSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, DataStreamerUpdateAfterLoadTest.class, ignoredTests); suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
