[ 
https://issues.apache.org/jira/browse/IGNITE-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15968760#comment-15968760
 ] 

Konstantin Dudkov commented on IGNITE-4982:
-------------------------------------------

The point was as follows:

In some situations we try to add to GridDhtAtomicSingleUpdateRequest both 
writeValue and nearWriteValue. But in case of adding nearWriteValue we 
overwrite writeValue and node doesn't process DHT update. I add saparate flag 
in GridDhtAtomicSingleUpdateRequest so now node can process this situation 
right.

[PR|https://github.com/apache/ignite/pull/1792]

Please review.

> GridCacheAbstractRemoveFailureTest fail
> ---------------------------------------
>
>                 Key: IGNITE-4982
>                 URL: https://issues.apache.org/jira/browse/IGNITE-4982
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>            Reporter: Konstantin Dudkov
>            Assignee: Konstantin Dudkov
>             Fix For: 2.0
>
>
> GridCacheAbstractRemoveFailureTest (and some child tests) fails. Reproducer:
> {code:java}
> import java.util.Collection;
> import java.util.HashSet;
> import java.util.Map;
> import java.util.concurrent.Callable;
> import java.util.concurrent.CyclicBarrier;
> import java.util.concurrent.ThreadLocalRandom;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicLong;
> import java.util.concurrent.atomic.AtomicReference;
> import javax.cache.CacheException;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteException;
> import org.apache.ignite.IgniteTransactions;
> import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.configuration.NearCacheConfiguration;
> import org.apache.ignite.internal.IgniteInternalFuture;
> import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
> import org.apache.ignite.internal.util.lang.GridTuple;
> import org.apache.ignite.internal.util.typedef.F;
> import org.apache.ignite.internal.util.typedef.T2;
> import org.apache.ignite.internal.util.typedef.X;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.apache.ignite.transactions.Transaction;
> import org.apache.ignite.transactions.TransactionConcurrency;
> import org.apache.ignite.transactions.TransactionIsolation;
> import org.jsr166.ConcurrentHashMap8;
> import static 
> org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
> import static org.apache.ignite.cache.CacheMode.PARTITIONED;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> import static 
> org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
> import static 
> org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
> import static 
> org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
> import static 
> org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
> /**
>  * Tests that removes are not lost when topology changes.
>  */
> public abstract class GridCacheAbstractRemoveFailureTest extends 
> GridCommonAbstractTest {
>     /** IP finder. */
>     private static final TcpDiscoveryIpFinder IP_FINDER = new 
> TcpDiscoveryVmIpFinder(true);
>     /** */
>     private static final int GRID_CNT = 3;
>     /** Keys count. */
>     private static final int KEYS_CNT = 10_000;
>     /** Test duration. */
>     private static final long DUR = 90 * 1000L;
>     /** Cache data assert frequency. */
>     private static final long ASSERT_FREQ = 10_000;
>     /** Kill delay. */
>     private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, 
> 5000);
>     /** Start delay. */
>     private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 
> 5000);
>     /** */
>     private static String sizePropVal;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
>         
> ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true);
>         if (testClientNode() && 
> getTestIgniteInstanceName(0).equals(igniteInstanceName))
>             cfg.setClientMode(true);
>         
> ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTestsStarted() throws Exception {
>         // Need to increase value set in GridAbstractTest
>         sizePropVal = 
> System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE);
>         System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000");
>         startGrids(GRID_CNT);
>     }
>     /** {@inheritDoc} */
>     @Override protected void afterTestsStopped() throws Exception {
>         super.afterTestsStopped();
>         System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 
> sizePropVal != null ? sizePropVal : "");
>         stopAllGrids();
>     }
>     /** {@inheritDoc} */
>     @Override protected long getTestTimeout() {
>         return DUR + 60_000;
>     }
>     /**
>      * @return Cache mode.
>      */
>     protected abstract CacheMode cacheMode();
>     /**
>      * @return Cache atomicity mode.
>      */
>     protected abstract CacheAtomicityMode atomicityMode();
>     /**
>      * @return Near cache configuration.
>      */
>     protected abstract NearCacheConfiguration nearCache();
>     /**
>      * @return Atomic cache write order mode.
>      */
>     protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
>         return null;
>     }
>     /**
>      * @return {@code True} if test updates from client node.
>      */
>     protected boolean testClientNode() {
>         return false;
>     }
>     /**
>      * @throws Exception If failed.
>      */
>     public void testPutAndRemove() throws Exception {
>         putAndRemove(DUR, null, null);
>     }
>     /**
>      * @throws Exception If failed.
>      */
>     public void testPutAndRemovePessimisticTx() throws Exception {
>         if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL)
>             return;
>         putAndRemove(30_000, PESSIMISTIC, REPEATABLE_READ);
>     }
>     /**
>      * @throws Exception If failed.
>      */
>     public void testPutAndRemoveOptimisticSerializableTx() throws Exception {
>         if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL)
>             return;
>         putAndRemove(30_000, OPTIMISTIC, SERIALIZABLE);
>     }
>     /**
>      * @param duration Test duration.
>      * @param txConcurrency Transaction concurrency if test explicit 
> transaction.
>      * @param txIsolation Transaction isolation if test explicit transaction.
>      * @throws Exception If failed.
>      */
>     private void putAndRemove(long duration,
>         final TransactionConcurrency txConcurrency,
>         final TransactionIsolation txIsolation) throws Exception {
>         assertEquals(testClientNode(), (boolean) 
> grid(0).configuration().isClientMode());
>         grid(0).destroyCache(null);
>         CacheConfiguration<Integer, Integer> ccfg = new 
> CacheConfiguration<>();
>         ccfg.setWriteSynchronizationMode(FULL_SYNC);
>         ccfg.setCacheMode(cacheMode());
>         if (cacheMode() == PARTITIONED)
>             ccfg.setBackups(1);
>         ccfg.setAtomicityMode(atomicityMode());
>         ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
>         ccfg.setNearConfiguration(nearCache());
>         final IgniteCache<Integer, Integer> sndCache0 = 
> grid(0).createCache(ccfg);
>         final AtomicBoolean stop = new AtomicBoolean();
>         final AtomicLong cntr = new AtomicLong();
>         final AtomicLong errCntr = new AtomicLong();
>         // Expected values in cache.
>         final Map<Integer, GridTuple<Integer>> expVals = new 
> ConcurrentHashMap8<>();
>         final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>();
>         IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new 
> Callable<Void>() {
>             @Override public Void call() throws Exception {
>                 Thread.currentThread().setName("update-thread");
>                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
>                 IgniteTransactions txs = 
> sndCache0.unwrap(Ignite.class).transactions();
>                 while (!stop.get()) {
>                     for (int i = 0; i < 100; i++) {
>                         int key = rnd.nextInt(KEYS_CNT);
>                         boolean put = rnd.nextInt(0, 100) > 10;
>                         while (true) {
>                             try {
>                                 if (put) {
>                                     boolean failed = false;
>                                     if (txConcurrency != null) {
>                                         try (Transaction tx = 
> txs.txStart(txConcurrency, txIsolation)) {
>                                             sndCache0.put(key, i);
>                                             tx.commit();
>                                         }
>                                         catch (CacheException | 
> IgniteException e) {
>                                             if (!X.hasCause(e, 
> ClusterTopologyCheckedException.class)) {
>                                                 log.error("Unexpected error: 
> " + e);
>                                                 throw e;
>                                             }
>                                             failed = true;
>                                         }
>                                     }
>                                     else
>                                         sndCache0.put(key, i);
>                                     if (!failed)
>                                         expVals.put(key, F.t(i));
>                                 }
>                                 else {
>                                     boolean failed = false;
>                                     if (txConcurrency != null) {
>                                         try (Transaction tx = 
> txs.txStart(txConcurrency, txIsolation)) {
>                                             sndCache0.remove(key);
>                                             tx.commit();
>                                         }
>                                         catch (CacheException | 
> IgniteException e) {
>                                             if (!X.hasCause(e, 
> ClusterTopologyCheckedException.class)) {
>                                                 log.error("Unexpected error: 
> " + e);
>                                                 throw e;
>                                             }
>                                             failed = true;
>                                         }
>                                     }
>                                     else
>                                         sndCache0.remove(key);
>                                     if (!failed)
>                                         expVals.put(key, F.<Integer>t(null));
>                                 }
>                                 break;
>                             }
>                             catch (CacheException e) {
>                                 if (put)
>                                     log.error("Put failed [key=" + key + ", 
> val=" + i + ']', e);
>                                 else
>                                     log.error("Remove failed [key=" + key + 
> ']', e);
>                                 errCntr.incrementAndGet();
>                             }
>                         }
>                     }
>                     cntr.addAndGet(100);
>                     CyclicBarrier barrier = cmp.get();
>                     if (barrier != null) {
>                         log.info("Wait data check.");
>                         barrier.await(60_000, TimeUnit.MILLISECONDS);
>                         log.info("Finished wait data check.");
>                     }
>                 }
>                 return null;
>             }
>         });
>         IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new 
> Callable<Void>() {
>             @Override public Void call() throws Exception {
>                 Thread.currentThread().setName("restart-thread");
>                 while (!stop.get()) {
>                     U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2()));
>                     killAndRestart(stop);
>                     CyclicBarrier barrier = cmp.get();
>                     if (barrier != null) {
>                         log.info("Wait data check.");
>                         barrier.await(60_000, TimeUnit.MILLISECONDS);
>                         log.info("Finished wait data check.");
>                     }
>                 }
>                 return null;
>             }
>         });
>         try {
>             long stopTime = duration + U.currentTimeMillis() ;
>             long nextAssert = U.currentTimeMillis() + ASSERT_FREQ;
>             while (U.currentTimeMillis() < stopTime) {
>                 long start = System.nanoTime();
>                 long ops = cntr.longValue();
>                 U.sleep(1000);
>                 long diff = cntr.longValue() - ops;
>                 double time = (System.nanoTime() - start) / 1_000_000_000d;
>                 long opsPerSecond = (long)(diff / time);
>                 log.info("Operations/second: " + opsPerSecond);
>                 if (U.currentTimeMillis() >= nextAssert) {
>                     CyclicBarrier barrier = new CyclicBarrier(3, new 
> Runnable() {
>                         @Override public void run() {
>                             try {
>                                 cmp.set(null);
>                                 log.info("Checking cache content.");
>                                 assertCacheContent(expVals);
>                                 log.info("Finished check cache content.");
>                             }
>                             catch (Throwable e) {
>                                 log.error("Unexpected error: " + e, e);
>                                 throw e;
>                             }
>                         }
>                     });
>                     log.info("Start cache content check.");
>                     cmp.set(barrier);
>                     try {
>                         barrier.await(60_000, TimeUnit.MILLISECONDS);
>                     }
>                     catch (TimeoutException e) {
>                         U.dumpThreads(log);
>                         fail("Failed to check cache content: " + e);
>                     }
>                     log.info("Cache content check done.");
>                     nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
>                 }
>             }
>         }
>         finally {
>             stop.set(true);
>         }
>         killFut.get();
>         updateFut.get();
>         log.info("Test finished. Update errors: " + errCntr.get());
>     }
>     /**
>      * @param stop Stop flag.
>      * @throws Exception If failed.
>      */
>     private void killAndRestart(AtomicBoolean stop) throws Exception {
>         if (stop.get())
>             return;
>         int idx = random(1, GRID_CNT + 1);
>         log.info("Killing node " + idx);
>         stopGrid(idx);
>         U.sleep(random(START_DELAY.get1(), START_DELAY.get2()));
>         log.info("Restarting node " + idx);
>         startGrid(idx);
>         if (stop.get())
>             return;
>         U.sleep(1000);
>     }
>     /**
>      * @param expVals Expected values in cache.
>      */
>     @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"})
>     private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) 
> {
>         assert !expVals.isEmpty();
>         Collection<Integer> failedKeys = new HashSet<>();
>         for (int i = 0; i < GRID_CNT; i++) {
>             Ignite ignite = grid(i);
>             IgniteCache<Integer, Integer> cache = ignite.cache(null);
>             for (Map.Entry<Integer, GridTuple<Integer>> expVal : 
> expVals.entrySet()) {
>                 Integer val = cache.get(expVal.getKey());
>                 if (!F.eq(expVal.getValue().get(), val)) {
>                     failedKeys.add(expVal.getKey());
>                     boolean primary = 
> affinity(cache).isPrimary(ignite.cluster().localNode(), expVal.getKey());
>                     boolean backup = 
> affinity(cache).isBackup(ignite.cluster().localNode(), expVal.getKey());
>                     log.error("Unexpected cache data [exp=" + expVal +
>                         ", actual=" + val +
>                         ", nodePrimary=" + primary +
>                         ", nodeBackup=" + backup +
>                         ", nodeIdx" + i +
>                         ", nodeId=" + ignite.cluster().localNode().id() + 
> ']');
>                 }
>             }
>         }
>         assertTrue("Unexpected data for keys: " + failedKeys, 
> failedKeys.isEmpty());
>     }
>     /**
>      * @param min Min possible value.
>      * @param max Max possible value (exclusive).
>      * @return Random value.
>      */
>     private static int random(int min, int max) {
>         if (max == min)
>             return max;
>         return ThreadLocalRandom.current().nextInt(min, max);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to