[ 
https://issues.apache.org/jira/browse/IGNITE-28662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Petrov updated IGNITE-28662:
------------------------------------
    Description: 
Consider a cluster of 3 nodes - node0, node1, node2

1. node1 and node2 receive put request from a thin client. node1 is "primary" 
for cache keys received by node2 and node2 is "primary" for cache keys received 
by node1. Both of them begin operations execution and wait for them to complete.
2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on 
both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin 
client operations to complete.
3. node1 and node2 fail to process cache request for some reason (a cache 
interceptor raised an exceception)
4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed keys 
to each other because they are both stopping (see GridCacheIoManager#onSend). 
This message is an indication to the "near" node that some keys could not be 
processed and the operation should be terminated with an exception.
5. node1 and node2 are unable to complete the cache operations received from 
the thin client (both of them will never receive GridNearAtomicUpdateResponse 
or NODE_LEFT event for the primary node ) -> they are unable to complete the 
stop procedure

Reproducer:

{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite;

import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import javax.cache.Cache;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static 
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;

/** */
public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends 
GridCommonAbstractTest {
    /** */
    public static final int NODE_1_FIRST_KEY = 1;

    /** */
    public static final int NODE_1_SECOND_KEY = 4;

    /** */
    public static final int NODE_2_FIRST_KEY = 2;

    /** */
    public static final int NODE_2_SECOND_KEY = 5;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        return super.getConfiguration(igniteInstanceName)
            .setCommunicationSpi(new TestRecordingCommunicationSpi())
            .setUserAttributes(singletonMap(IDX_ATTR, 
getTestIgniteInstanceIndex(igniteInstanceName)));
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        super.afterTest();

        stopAllGrids();
    }

    /** */
    @Test
    public void testCacheEntriesProcessingFailureCausedByNodeStop() throws 
Exception {
        startGridsMultiThreaded(3);

        TestInterceptor.putStartedLatch = new CountDownLatch(2);
        TestInterceptor.putUnblockedLatch = new CountDownLatch(1);

        grid(0).createCache(createTestCacheConfiguration());

        try (
            IgniteClient cli1 = Ignition.startClient(new 
ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
            IgniteClient cli2 = Ignition.startClient(new 
ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
        ) {
            IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(() -> 
cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
            IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(() -> 
cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));

            assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(), 
MILLISECONDS));

            IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(() 
-> stopGrid(1));
            IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(() 
-> stopGrid(2));

            try {
                TestInterceptor.putUnblockedLatch.countDown();

                stopFut1.get(getTestTimeout());
                stopFut2.get(getTestTimeout());

                putFut1.get(getTestTimeout());
                putFut2.get(getTestTimeout());
            }
            catch (CachePartialUpdateException e) {
                assertTrue(e.getMessage().contains("Failed to update keys 
(retry update if possible)"));
            }
        }
    }

    /** */
    private CacheConfiguration<Integer, Integer> createTestCacheConfiguration() 
{
        return new CacheConfiguration<Integer, Integer>()
            .setName(DEFAULT_CACHE_NAME)
            .setAtomicityMode(ATOMIC)
            .setWriteSynchronizationMode(FULL_SYNC)
            .setBackups(2)
            .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
            .setInterceptor(new TestInterceptor());
    }

    /** */
    private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
        Map<Integer, Integer> data = new TreeMap<>();

        if (nodeIdx == 2) {
            data.put(NODE_2_FIRST_KEY, 2);
            data.put(NODE_2_SECOND_KEY, 5);
        }
        else {
            data.put(NODE_1_FIRST_KEY, 1);
            data.put(NODE_1_SECOND_KEY, 4);
        }


        return data;
    }

    /** */
    public static final class TestInterceptor implements 
CacheInterceptor<Integer, Integer> {
        /** */
        public static CountDownLatch putStartedLatch;

        /** */
        public static CountDownLatch putUnblockedLatch;

        /** {@inheritDoc} */
        @Override public @Nullable Integer onGet(Integer key, @Nullable Integer 
val) {
            return val;
        }

        /** {@inheritDoc} */
        @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer, 
Integer> entry, Integer newVal) {
            if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() == 
NODE_2_FIRST_KEY) {
                putStartedLatch.countDown();

                try {
                    assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();

                    throw new IgniteException(e);
                }
            }
            else
                throw new IgniteException("expected");

            return newVal;
        }

        /** {@inheritDoc} */
        @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) {
            // No-op.
        }

        /** {@inheritDoc} */
        @Override public @Nullable IgniteBiTuple<Boolean, Integer> 
onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
            return null;
        }

        /** {@inheritDoc} */
        @Override public void onAfterRemove(Cache.Entry<Integer, Integer> 
entry) {
            // No-op.
        }
    }
}

{code}


  was:
Consider a cluster of 3 nodes - node0, node1, node2

1. node1 and node2 receive put request from a thin client. node1 is "primary" 
for cache keys received by node2 and node2 is "primary" for cache keys received 
by node1. Both of them begin operations execution and wait for them to complete.
2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on 
both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin 
client operations to complete.
3. node1 and node2 fail to process cache request for some reason (a cache 
interceptor raised an exceception)
4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed keys 
to each other because they are both stopping (see GridCacheIoManager#onSend). 
This message is an indication to the "near" node that some keys could not be 
processed and the operation should be terminated with an exception.
5. node1 and node2 are unable to complete the cache operations received from 
the thin client (both of them will never receive GridNearAtomicUpdateResponse 
or NODE_LEFT event for the primary node ) -> they are unable to complete the 
stop procedure


> Node stop may be infinitely blocked by atomic cache operations invoked from 
> thin client
> ---------------------------------------------------------------------------------------
>
>                 Key: IGNITE-28662
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28662
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Mikhail Petrov
>            Priority: Major
>
> Consider a cluster of 3 nodes - node0, node1, node2
> 1. node1 and node2 receive put request from a thin client. node1 is "primary" 
> for cache keys received by node2 and node2 is "primary" for cache keys 
> received by node1. Both of them begin operations execution and wait for them 
> to complete.
> 2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on 
> both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin 
> client operations to complete.
> 3. node1 and node2 fail to process cache request for some reason (a cache 
> interceptor raised an exceception)
> 4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed 
> keys to each other because they are both stopping (see 
> GridCacheIoManager#onSend). This message is an indication to the "near" node 
> that some keys could not be processed and the operation should be terminated 
> with an exception.
> 5. node1 and node2 are unable to complete the cache operations received from 
> the thin client (both of them will never receive GridNearAtomicUpdateResponse 
> or NODE_LEFT event for the primary node ) -> they are unable to complete the 
> stop procedure
> Reproducer:
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.ignite;
> import java.util.Map;
> import java.util.TreeMap;
> import java.util.concurrent.CountDownLatch;
> import javax.cache.Cache;
> import org.apache.ignite.cache.CacheInterceptor;
> import org.apache.ignite.cache.CachePartialUpdateException;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.ClientConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.IgniteInternalFuture;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import 
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.jetbrains.annotations.Nullable;
> import org.junit.Test;
> import static java.util.Collections.singletonMap;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> import static 
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
> /** */
> public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends 
> GridCommonAbstractTest {
>     /** */
>     public static final int NODE_1_FIRST_KEY = 1;
>     /** */
>     public static final int NODE_1_SECOND_KEY = 4;
>     /** */
>     public static final int NODE_2_FIRST_KEY = 2;
>     /** */
>     public static final int NODE_2_SECOND_KEY = 5;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         return super.getConfiguration(igniteInstanceName)
>             .setCommunicationSpi(new TestRecordingCommunicationSpi())
>             .setUserAttributes(singletonMap(IDX_ATTR, 
> getTestIgniteInstanceIndex(igniteInstanceName)));
>     }
>     /** {@inheritDoc} */
>     @Override protected void afterTest() throws Exception {
>         super.afterTest();
>         stopAllGrids();
>     }
>     /** */
>     @Test
>     public void testCacheEntriesProcessingFailureCausedByNodeStop() throws 
> Exception {
>         startGridsMultiThreaded(3);
>         TestInterceptor.putStartedLatch = new CountDownLatch(2);
>         TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
>         grid(0).createCache(createTestCacheConfiguration());
>         try (
>             IgniteClient cli1 = Ignition.startClient(new 
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
>             IgniteClient cli2 = Ignition.startClient(new 
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
>         ) {
>             IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(() 
> -> cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
>             IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(() 
> -> cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));
>             
> assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(), 
> MILLISECONDS));
>             IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(() 
> -> stopGrid(1));
>             IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(() 
> -> stopGrid(2));
>             try {
>                 TestInterceptor.putUnblockedLatch.countDown();
>                 stopFut1.get(getTestTimeout());
>                 stopFut2.get(getTestTimeout());
>                 putFut1.get(getTestTimeout());
>                 putFut2.get(getTestTimeout());
>             }
>             catch (CachePartialUpdateException e) {
>                 assertTrue(e.getMessage().contains("Failed to update keys 
> (retry update if possible)"));
>             }
>         }
>     }
>     /** */
>     private CacheConfiguration<Integer, Integer> 
> createTestCacheConfiguration() {
>         return new CacheConfiguration<Integer, Integer>()
>             .setName(DEFAULT_CACHE_NAME)
>             .setAtomicityMode(ATOMIC)
>             .setWriteSynchronizationMode(FULL_SYNC)
>             .setBackups(2)
>             .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
>             .setInterceptor(new TestInterceptor());
>     }
>     /** */
>     private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
>         Map<Integer, Integer> data = new TreeMap<>();
>         if (nodeIdx == 2) {
>             data.put(NODE_2_FIRST_KEY, 2);
>             data.put(NODE_2_SECOND_KEY, 5);
>         }
>         else {
>             data.put(NODE_1_FIRST_KEY, 1);
>             data.put(NODE_1_SECOND_KEY, 4);
>         }
>         return data;
>     }
>     /** */
>     public static final class TestInterceptor implements 
> CacheInterceptor<Integer, Integer> {
>         /** */
>         public static CountDownLatch putStartedLatch;
>         /** */
>         public static CountDownLatch putUnblockedLatch;
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer onGet(Integer key, @Nullable 
> Integer val) {
>             return val;
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer, 
> Integer> entry, Integer newVal) {
>             if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() == 
> NODE_2_FIRST_KEY) {
>                 putStartedLatch.countDown();
>                 try {
>                     assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
>                 }
>                 catch (InterruptedException e) {
>                     Thread.currentThread().interrupt();
>                     throw new IgniteException(e);
>                 }
>             }
>             else
>                 throw new IgniteException("expected");
>             return newVal;
>         }
>         /** {@inheritDoc} */
>         @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) 
> {
>             // No-op.
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable IgniteBiTuple<Boolean, Integer> 
> onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
>             return null;
>         }
>         /** {@inheritDoc} */
>         @Override public void onAfterRemove(Cache.Entry<Integer, Integer> 
> entry) {
>             // No-op.
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to