[ 
https://issues.apache.org/jira/browse/IGNITE-28662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Petrov updated IGNITE-28662:
------------------------------------
    Fix Version/s: 2.19

> Node stop may be infinitely blocked by atomic cache operations invoked from 
> thin client
> ---------------------------------------------------------------------------------------
>
>                 Key: IGNITE-28662
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28662
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Mikhail Petrov
>            Priority: Major
>             Fix For: 2.19
>
>
> Consider a cluster of 3 nodes - node0, node1, node2
> 1. node1 and node2 receive put request from a thin client. node1 is "primary" 
> for cache keys received by node2 and node2 is "primary" for cache keys 
> received by node1. Both of them begin operations execution and wait for them 
> to complete.
> 2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on 
> both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin 
> client operations to complete.
> 3. node1 and node2 fail to process cache request for some reason (a cache 
> interceptor raised an exceception)
> 4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed 
> keys to each other because they are both stopping (see 
> GridCacheIoManager#onSend). This message is an indication to the "near" node 
> that some keys could not be processed and the operation should be terminated 
> with an exception.
> 5. node1 and node2 are unable to complete the cache operations received from 
> the thin client (both of them will never receive GridNearAtomicUpdateResponse 
> or NODE_LEFT event for the primary node ) -> they are unable to complete the 
> stop procedure
> Reproducer:
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.ignite;
> import java.util.Map;
> import java.util.TreeMap;
> import java.util.concurrent.CountDownLatch;
> import javax.cache.Cache;
> import org.apache.ignite.cache.CacheInterceptor;
> import org.apache.ignite.cache.CachePartialUpdateException;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.ClientConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.IgniteInternalFuture;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import 
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.jetbrains.annotations.Nullable;
> import org.junit.Test;
> import static java.util.Collections.singletonMap;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> import static 
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
> /** */
> public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends 
> GridCommonAbstractTest {
>     /** */
>     public static final int NODE_1_FIRST_KEY = 1;
>     /** */
>     public static final int NODE_1_SECOND_KEY = 4;
>     /** */
>     public static final int NODE_2_FIRST_KEY = 2;
>     /** */
>     public static final int NODE_2_SECOND_KEY = 5;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         return super.getConfiguration(igniteInstanceName)
>             .setCommunicationSpi(new TestRecordingCommunicationSpi())
>             .setUserAttributes(singletonMap(IDX_ATTR, 
> getTestIgniteInstanceIndex(igniteInstanceName)));
>     }
>     /** {@inheritDoc} */
>     @Override protected void afterTest() throws Exception {
>         super.afterTest();
>         stopAllGrids();
>     }
>     /** */
>     @Test
>     public void testCacheEntriesProcessingFailureCausedByNodeStop() throws 
> Exception {
>         startGridsMultiThreaded(3);
>         TestInterceptor.putStartedLatch = new CountDownLatch(2);
>         TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
>         grid(0).createCache(createTestCacheConfiguration());
>         try (
>             IgniteClient cli1 = Ignition.startClient(new 
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
>             IgniteClient cli2 = Ignition.startClient(new 
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
>         ) {
>             IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(() 
> -> cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
>             IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(() 
> -> cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));
>             
> assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(), 
> MILLISECONDS));
>             IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(() 
> -> stopGrid(1));
>             IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(() 
> -> stopGrid(2));
>             try {
>                 TestInterceptor.putUnblockedLatch.countDown();
>                 stopFut1.get(getTestTimeout());
>                 stopFut2.get(getTestTimeout());
>                 putFut1.get(getTestTimeout());
>                 putFut2.get(getTestTimeout());
>             }
>             catch (CachePartialUpdateException e) {
>                 assertTrue(e.getMessage().contains("Failed to update keys 
> (retry update if possible)"));
>             }
>         }
>     }
>     /** */
>     private CacheConfiguration<Integer, Integer> 
> createTestCacheConfiguration() {
>         return new CacheConfiguration<Integer, Integer>()
>             .setName(DEFAULT_CACHE_NAME)
>             .setAtomicityMode(ATOMIC)
>             .setWriteSynchronizationMode(FULL_SYNC)
>             .setBackups(2)
>             .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
>             .setInterceptor(new TestInterceptor());
>     }
>     /** */
>     private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
>         Map<Integer, Integer> data = new TreeMap<>();
>         if (nodeIdx == 2) {
>             data.put(NODE_2_FIRST_KEY, 2);
>             data.put(NODE_2_SECOND_KEY, 5);
>         }
>         else {
>             data.put(NODE_1_FIRST_KEY, 1);
>             data.put(NODE_1_SECOND_KEY, 4);
>         }
>         return data;
>     }
>     /** */
>     public static final class TestInterceptor implements 
> CacheInterceptor<Integer, Integer> {
>         /** */
>         public static CountDownLatch putStartedLatch;
>         /** */
>         public static CountDownLatch putUnblockedLatch;
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer onGet(Integer key, @Nullable 
> Integer val) {
>             return val;
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer, 
> Integer> entry, Integer newVal) {
>             if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() == 
> NODE_2_FIRST_KEY) {
>                 putStartedLatch.countDown();
>                 try {
>                     assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
>                 }
>                 catch (InterruptedException e) {
>                     Thread.currentThread().interrupt();
>                     throw new IgniteException(e);
>                 }
>             }
>             else
>                 throw new IgniteException("expected");
>             return newVal;
>         }
>         /** {@inheritDoc} */
>         @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) 
> {
>             // No-op.
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable IgniteBiTuple<Boolean, Integer> 
> onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
>             return null;
>         }
>         /** {@inheritDoc} */
>         @Override public void onAfterRemove(Cache.Entry<Integer, Integer> 
> entry) {
>             // No-op.
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to