Repository: ignite Updated Branches: refs/heads/ignite-2.7 a7480909b -> f89f8e34f
IGNITE-9731 Fixed NPE on concurrent WAL flush - Fixes #4863. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f89f8e34 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f89f8e34 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f89f8e34 Branch: refs/heads/ignite-2.7 Commit: f89f8e34ff81c89a798a3bef6d52daceede55945 Parents: a748090 Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Fri Sep 28 18:20:00 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Sep 28 18:24:19 2018 +0300 ---------------------------------------------------------------------- .../wal/FileWriteAheadLogManager.java | 10 +- .../db/wal/WalRolloverRecordLoggingTest.java | 156 +++++++++++++++++++ .../IgnitePdsWithIndexingCoreTestSuite.java | 2 + 3 files changed, 165 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f89f8e34/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5d165fd..31617a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2354,19 +2354,23 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private abstract static class FileHandle { /** I/O interface for read/write operations with file */ SegmentIO fileIO; + + /** Segment idx corresponded to fileIo*/ + final long segmentIdx; /** - * @param fileIO I/O interface for read/write operations of FileHandle. * + * @param fileIO I/O interface for read/write operations of FileHandle. */ - private FileHandle(SegmentIO fileIO) { + private FileHandle(@NotNull SegmentIO fileIO) { this.fileIO = fileIO; + segmentIdx = fileIO.getSegmentId(); } /** * @return Absolute WAL segment file index (incremental counter). */ public long getSegmentId(){ - return fileIO.getSegmentId(); + return segmentIdx; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f89f8e34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java new file mode 100644 index 0000000..67caf63 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH; +import static org.apache.ignite.configuration.WALMode.LOG_ONLY; +/** + * + */ +public class WalRolloverRecordLoggingTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static class RolloverRecord extends CheckpointRecord { + /** */ + private RolloverRecord() { + super(null); + } + + /** {@inheritDoc} */ + @Override public boolean rollOver() { + return true; + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(40 * 1024 * 1024)) + .setWalMode(LOG_ONLY) + .setWalSegmentSize(4 * 1024 * 1024) + .setWalArchivePath(DFLT_WAL_PATH)); + + cfg.setFailureHandler(new StopNodeOrHaltFailureHandler(false, 0)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + public void testAvoidInfinityWaitingOnRolloverOfSegment() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + long startTime = U.currentTimeMillis(); + long duration = 5_000; + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync( + () -> { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() - startTime < duration) + cache.put(random.nextInt(100_000), random.nextInt(100_000)); + }, + 8, "cache-put-thread"); + + Thread t = new Thread(() -> { + do { + try { + U.sleep(100); + } + catch (IgniteInterruptedCheckedException e) { + // No-op. + } + + ig.context().cache().context().database().wakeupForCheckpoint("test"); + } while (U.currentTimeMillis() - startTime < duration); + }); + + t.start(); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + IgniteCacheDatabaseSharedManager dbMgr = ig.context().cache().context().database(); + + RolloverRecord rec = new RolloverRecord(); + + do { + try { + dbMgr.checkpointReadLock(); + + try { + walMgr.log(rec); + } + finally { + dbMgr.checkpointReadUnlock(); + } + } + catch (IgniteCheckedException e) { + log.error(e.getMessage(), e); + } + } while (U.currentTimeMillis() - startTime < duration); + + fut.get(); + + t.join(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f89f8e34/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 2989ccd..caea388 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalR import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalPathsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverRecordLoggingTest; /** * Test suite for tests that cover core PDS features and depend on indexing module. @@ -59,6 +60,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(PersistenceDirectoryWarningLoggingTest.class); suite.addTestSuite(WalPathsTest.class); suite.addTestSuite(WalRecoveryTxLogicalRecordsTest.class); + suite.addTestSuite(WalRolloverRecordLoggingTest.class); suite.addTestSuite(IgniteWalRecoveryTest.class); suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class);