This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-18540 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit d074c3fc9d1b737b8115d47a3c0e958b3312db52 Author: Slava Koptilin <[email protected]> AuthorDate: Fri Apr 26 13:06:14 2019 +0300 GG-17388 Fixed flaky test IgnitePdsStartWIthEmptyArchive --- .../db/IgnitePdsStartWIthEmptyArchive.java | 95 ++++++++++++++-------- 1 file changed, 60 insertions(+), 35 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java index b9a883e..f96233d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java @@ -18,20 +18,26 @@ package org.apache.ignite.internal.processors.cache.persistence.db; import java.io.File; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle; -import org.apache.ignite.internal.util.future.CountDownFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,6 +49,9 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWr * */ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { + /** Mapping of WAL segment idx to WalSegmentArchivedEvent. */ + private final Map<Long, WalSegmentArchivedEvent> evts = new ConcurrentHashMap<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -63,9 +72,27 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { ) ); + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put((e) -> { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + + log.info("EVT_WAL_SEGMENT_ARCHIVED: " + archComplEvt.getAbsWalSegmentIdx()); + + evts.put(archComplEvt.getAbsWalSegmentIdx(), archComplEvt); + + return true; + }, new int[] {EVT_WAL_SEGMENT_ARCHIVED}); + + cfg.setLocalEventListeners(lsnrs); + return cfg; } + /** + * Executes initial steps before test execution. + * @throws Exception If failed. + */ @Before public void before() throws Exception { stopAllGrids(); @@ -73,6 +100,19 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { cleanPersistenceDir(); } + /** + * Stops all nodes and cleans work dir after a test. + */ + @After + public void cleanup() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ @Test public void test() throws Exception { IgniteEx ig = startGrid(0); @@ -85,9 +125,8 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) { int entries = 1000; - for (int i = 0; i < entries; i++) { + for (int i = 0; i < entries; i++) st.addData(i, new byte[1024 * 1024]); - } } File archiveDir = U.field(walMgr, "walArchiveDir"); @@ -96,7 +135,7 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { SegmentAware beforeSaw = U.field(walMgr, "segmentAware"); - long beforeLastArchivedAbsoluteIndex = beforeSaw.lastArchivedAbsoluteIndex(); + long beforeLastArchivedAbsoluteIdx = beforeSaw.lastArchivedAbsoluteIndex(); FileWriteHandle fhBefore = U.field(walMgr, "currHnd"); @@ -114,6 +153,8 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { Assert.assertEquals(0, archiveDir.listFiles().length); + evts.clear(); + // Restart grid again after archive was removed. ig = startGrid(0); @@ -126,10 +167,10 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { int segments = ig.configuration().getDataStorageConfiguration().getWalSegments(); Assert.assertTrue( - "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex + + "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIdx + ", lastArchivedAfterIdx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments, afterLastArchivedAbsoluteIndex >= - (beforeLastArchivedAbsoluteIndex - segments)); + (beforeLastArchivedAbsoluteIdx - segments)); ig.cluster().active(true); @@ -140,38 +181,22 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { long idxAfter = fhAfter.getSegmentId(); Assert.assertEquals(idxBefore, idxAfter); - Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIndex); + Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIdx); - // Future for await all current available semgment will be archived. - CountDownFuture awaitAchviedSegmentsLatch = new CountDownFuture( - // One is a last archived, secod is a current write segment. - (int)(idxAfter - afterLastArchivedAbsoluteIndex - 2) - ); - - log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex + + log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIdx + ", lastArchivedAfteridx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments); - ig.events().localListen(e -> { - WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - - log.info("EVT_WAL_SEGMENT_ARCHIVED:" + archComplEvt.getAbsWalSegmentIdx()); - - if (archComplEvt.getAbsWalSegmentIdx() > afterLastArchivedAbsoluteIndex){ - awaitAchviedSegmentsLatch.onDone(); - - return true; - } + // One is a last archived, secod is a current write segment. + final long awaitAchviedSegments = idxAfter - afterLastArchivedAbsoluteIndex - 2; - if (archComplEvt.getAbsWalSegmentIdx() < afterLastArchivedAbsoluteIndex){ - awaitAchviedSegmentsLatch.onDone(new IgniteException("Unexected segment for archivation. idx=" - + archComplEvt.getAbsWalSegmentIdx())); - - return false; - } - - return true; - }, EVT_WAL_SEGMENT_ARCHIVED); + // Await all current available semgment will be archived. + assertTrue(GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + long cut = evts.keySet().stream().filter(e -> e > afterLastArchivedAbsoluteIndex).count(); - awaitAchviedSegmentsLatch.get(); + return cut >= awaitAchviedSegments; + } + }, 10_000)); } }
