[
https://issues.apache.org/jira/browse/IGNITE-12370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pavel Pereslegin updated IGNITE-12370:
--------------------------------------
Description:
For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the WAL
segment *exists* in an archive ({{File.exists}}) and then determines that index
*was* in the archive (using {{Files.list}}). If the archive file was created
between these operations {{hasIndex}} will return the false-negative result and
the partition map exchange will fail on this node.
Reproducer:
{code:java}
public class IgniteWalHistoryReservationsWithLoadTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1));
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setMaxSize(200L * 1024 * 1024)
.setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY)
.setWalSegmentSize(512 * 1024)
.setCheckpointFrequency(500);
cfg.setDataStorageConfiguration(memCfg);
CacheConfiguration ccfg1 = new CacheConfiguration();
ccfg1.setName("cache1");
ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setCacheConfiguration(ccfg1);
return cfg;
}
@Test
public void testReservationWithConstantLoad() throws Exception {
final IgniteEx node = startGrid(0);
node.cluster().active(true);
AtomicLong cntr = new AtomicLong(100_000);
ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr);
IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader");
U.sleep(500);
forceCheckpoint(node);
// Reserve history from the beginning.
node.context().cache().context().database().reserveHistoryForExchange();
long endTime = U.currentTimeMillis() + 60_000;
GridCacheContext ctx = node.cachex("cache1").context();
int grpId = ctx.groupId();
int parts = ctx.topology().partitions();
try {
while (U.currentTimeMillis() < endTime &&
!Thread.currentThread().isInterrupted()) {
try {
for (int p = 0; p < parts; p++) {
boolean reserved =
node.context().cache().context().database().reserveHistoryForPreloading(grpId,
p, cntr.get());
assertTrue("Unable to reserve history [p=" + p + ",
cntr=" + cntr.get() + "]", reserved);
}
} finally {
node.context().cache().context().database().releaseHistoryForPreloading();
}
}
}
finally {
node.context().cache().context().database().releaseHistoryForExchange();
ldr.stop();
}
fut.get(10_000);
}
static class ConstantLoader implements Callable<Void> {
private final IgniteCache cache;
private final AtomicLong cntr;
private volatile boolean stop;
ConstantLoader(IgniteCache cache, AtomicLong cntr) {
this.cache = cache;
this.cntr = cntr;
}
@Override public Void call() throws Exception {
while (!stop && !Thread.currentThread().isInterrupted()) {
long n = cntr.getAndIncrement();
cache.put(n, n);
if (n % 100_000 == 0)
log.info("Loaded " + n);
}
return null;
}
public void stop() {
stop = true;
}
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
}
{code}
was:
For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the WAL
segment exists in an archive ({{File.exists}}) and then determines that index
was in the archive (using {{Files.list}}). If the archive file was created
between these operations {{hasIndex}} will return the false-negative result and
the partition map exchange will fail on this node.
Reproducer:
{code:java}
public class IgniteWalHistoryReservationsWithLoadTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1));
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setMaxSize(200L * 1024 * 1024)
.setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY)
.setWalSegmentSize(512 * 1024)
.setCheckpointFrequency(500);
cfg.setDataStorageConfiguration(memCfg);
CacheConfiguration ccfg1 = new CacheConfiguration();
ccfg1.setName("cache1");
ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setCacheConfiguration(ccfg1);
return cfg;
}
@Test
public void testReservationWithConstantLoad() throws Exception {
final IgniteEx node = startGrid(0);
node.cluster().active(true);
AtomicLong cntr = new AtomicLong(100_000);
ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr);
IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader");
U.sleep(500);
forceCheckpoint(node);
// Reserve history from the beginning.
node.context().cache().context().database().reserveHistoryForExchange();
long endTime = U.currentTimeMillis() + 60_000;
GridCacheContext ctx = node.cachex("cache1").context();
int grpId = ctx.groupId();
int parts = ctx.topology().partitions();
try {
while (U.currentTimeMillis() < endTime &&
!Thread.currentThread().isInterrupted()) {
try {
for (int p = 0; p < parts; p++) {
boolean reserved =
node.context().cache().context().database().reserveHistoryForPreloading(grpId,
p, cntr.get());
assertTrue("Unable to reserve history [p=" + p + ",
cntr=" + cntr.get() + "]", reserved);
}
} finally {
node.context().cache().context().database().releaseHistoryForPreloading();
}
}
}
finally {
node.context().cache().context().database().releaseHistoryForExchange();
ldr.stop();
}
fut.get(10_000);
}
static class ConstantLoader implements Callable<Void> {
private final IgniteCache cache;
private final AtomicLong cntr;
private volatile boolean stop;
ConstantLoader(IgniteCache cache, AtomicLong cntr) {
this.cache = cache;
this.cntr = cntr;
}
@Override public Void call() throws Exception {
while (!stop && !Thread.currentThread().isInterrupted()) {
long n = cntr.getAndIncrement();
cache.put(n, n);
if (n % 100_000 == 0)
log.info("Loaded " + n);
}
return null;
}
public void stop() {
stop = true;
}
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
}
{code}
> WAL history reservation may fail due to an incorrect determination of the
> existence of an archived WAL segment (under race condition).
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: IGNITE-12370
> URL: https://issues.apache.org/jira/browse/IGNITE-12370
> Project: Ignite
> Issue Type: Bug
> Components: persistence
> Affects Versions: 2.8
> Reporter: Pavel Pereslegin
> Priority: Major
>
> For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the
> WAL segment *exists* in an archive ({{File.exists}}) and then determines that
> index *was* in the archive (using {{Files.list}}). If the archive file was
> created between these operations {{hasIndex}} will return the false-negative
> result and the partition map exchange will fail on this node.
> Reproducer:
> {code:java}
> public class IgniteWalHistoryReservationsWithLoadTest extends
> GridCommonAbstractTest {
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String gridName)
> throws Exception {
> IgniteConfiguration cfg = super.getConfiguration(gridName);
> cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1));
> DataStorageConfiguration memCfg = new DataStorageConfiguration()
> .setDefaultDataRegionConfiguration(
> new DataRegionConfiguration()
> .setMaxSize(200L * 1024 * 1024)
> .setPersistenceEnabled(true))
> .setWalMode(WALMode.LOG_ONLY)
> .setWalSegmentSize(512 * 1024)
> .setCheckpointFrequency(500);
> cfg.setDataStorageConfiguration(memCfg);
> CacheConfiguration ccfg1 = new CacheConfiguration();
> ccfg1.setName("cache1");
> ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
> ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
> cfg.setCacheConfiguration(ccfg1);
> return cfg;
> }
> @Test
> public void testReservationWithConstantLoad() throws Exception {
> final IgniteEx node = startGrid(0);
> node.cluster().active(true);
> AtomicLong cntr = new AtomicLong(100_000);
> ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr);
> IgniteInternalFuture<Long> fut =
> GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader");
> U.sleep(500);
> forceCheckpoint(node);
> // Reserve history from the beginning.
>
> node.context().cache().context().database().reserveHistoryForExchange();
> long endTime = U.currentTimeMillis() + 60_000;
> GridCacheContext ctx = node.cachex("cache1").context();
> int grpId = ctx.groupId();
> int parts = ctx.topology().partitions();
> try {
> while (U.currentTimeMillis() < endTime &&
> !Thread.currentThread().isInterrupted()) {
> try {
> for (int p = 0; p < parts; p++) {
> boolean reserved =
> node.context().cache().context().database().reserveHistoryForPreloading(grpId,
> p, cntr.get());
> assertTrue("Unable to reserve history [p=" + p + ",
> cntr=" + cntr.get() + "]", reserved);
> }
> } finally {
>
> node.context().cache().context().database().releaseHistoryForPreloading();
> }
> }
> }
> finally {
>
> node.context().cache().context().database().releaseHistoryForExchange();
> ldr.stop();
> }
> fut.get(10_000);
> }
> static class ConstantLoader implements Callable<Void> {
> private final IgniteCache cache;
> private final AtomicLong cntr;
> private volatile boolean stop;
> ConstantLoader(IgniteCache cache, AtomicLong cntr) {
> this.cache = cache;
> this.cntr = cntr;
> }
> @Override public Void call() throws Exception {
> while (!stop && !Thread.currentThread().isInterrupted()) {
> long n = cntr.getAndIncrement();
> cache.put(n, n);
> if (n % 100_000 == 0)
> log.info("Loaded " + n);
> }
> return null;
> }
> public void stop() {
> stop = true;
> }
> }
> /** {@inheritDoc} */
> @Override protected void beforeTestsStarted() throws Exception {
> stopAllGrids();
> cleanPersistenceDir();
> }
> /** {@inheritDoc} */
> @Override protected void afterTest() throws Exception {
> stopAllGrids();
> cleanPersistenceDir();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)