[
https://issues.apache.org/jira/browse/IGNITE-28662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Petrov updated IGNITE-28662:
------------------------------------
Labels: ise (was: )
> Node stop may be infinitely blocked by atomic cache operations invoked from
> thin client
> ---------------------------------------------------------------------------------------
>
> Key: IGNITE-28662
> URL: https://issues.apache.org/jira/browse/IGNITE-28662
> Project: Ignite
> Issue Type: Bug
> Reporter: Mikhail Petrov
> Priority: Major
> Labels: ise
> Fix For: 2.19
>
>
> Consider a cluster of 3 nodes - node0, node1, node2
> 1. node1 and node2 receive put request from a thin client. node1 is "primary"
> for cache keys received by node2 and node2 is "primary" for cache keys
> received by node1. Both of them begin operations execution and wait for them
> to complete.
> 2. node1 and node2 receive stop signal (Ignite#close). The stop procedure on
> both nodes blocks on GridNioAsyncNotifyFilter#stop, which waits for the thin
> client operations to complete.
> 3. node1 and node2 fail to process cache request for some reason (a cache
> interceptor raised an exceception)
> 4. node1 and node 2 will not send GridNearAtomicUpdateResponse with failed
> keys to each other because they are both stopping (see
> GridCacheIoManager#onSend). This message is an indication to the "near" node
> that some keys could not be processed and the operation should be terminated
> with an exception.
> 5. node1 and node2 are unable to complete the cache operations received from
> the thin client (both of them will never receive GridNearAtomicUpdateResponse
> or NODE_LEFT event for the primary node ) -> they are unable to complete the
> stop procedure
> Reproducer:
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.ignite;
> import java.util.Map;
> import java.util.TreeMap;
> import java.util.concurrent.CountDownLatch;
> import javax.cache.Cache;
> import org.apache.ignite.cache.CacheInterceptor;
> import org.apache.ignite.cache.CachePartialUpdateException;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.ClientConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.IgniteInternalFuture;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.jetbrains.annotations.Nullable;
> import org.junit.Test;
> import static java.util.Collections.singletonMap;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> import static
> org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
> /** */
> public class NodeStopForeverBlockedByAtomicCacheOperationsTest extends
> GridCommonAbstractTest {
> /** */
> public static final int NODE_1_FIRST_KEY = 1;
> /** */
> public static final int NODE_1_SECOND_KEY = 4;
> /** */
> public static final int NODE_2_FIRST_KEY = 2;
> /** */
> public static final int NODE_2_SECOND_KEY = 5;
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String
> igniteInstanceName) throws Exception {
> return super.getConfiguration(igniteInstanceName)
> .setCommunicationSpi(new TestRecordingCommunicationSpi())
> .setUserAttributes(singletonMap(IDX_ATTR,
> getTestIgniteInstanceIndex(igniteInstanceName)));
> }
> /** {@inheritDoc} */
> @Override protected void afterTest() throws Exception {
> super.afterTest();
> stopAllGrids();
> }
> /** */
> @Test
> public void testCacheEntriesProcessingFailureCausedByNodeStop() throws
> Exception {
> startGridsMultiThreaded(3);
> TestInterceptor.putStartedLatch = new CountDownLatch(2);
> TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
> grid(0).createCache(createTestCacheConfiguration());
> try (
> IgniteClient cli1 = Ignition.startClient(new
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10801"));
> IgniteClient cli2 = Ignition.startClient(new
> ClientConfiguration().setClusterDiscoveryEnabled(false).setAddresses("127.0.0.1:10802"))
> ) {
> IgniteInternalFuture<Object> putFut1 = GridTestUtils.runAsync(()
> -> cli1.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(2)));
> IgniteInternalFuture<Object> putFut2 = GridTestUtils.runAsync(()
> -> cli2.cache(DEFAULT_CACHE_NAME).putAll(createKeysForNode(1)));
>
> assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(),
> MILLISECONDS));
> IgniteInternalFuture<Object> stopFut1 = GridTestUtils.runAsync(()
> -> stopGrid(1));
> IgniteInternalFuture<Object> stopFut2 = GridTestUtils.runAsync(()
> -> stopGrid(2));
> try {
> TestInterceptor.putUnblockedLatch.countDown();
> stopFut1.get(getTestTimeout());
> stopFut2.get(getTestTimeout());
> putFut1.get(getTestTimeout());
> putFut2.get(getTestTimeout());
> }
> catch (CachePartialUpdateException e) {
> assertTrue(e.getMessage().contains("Failed to update keys
> (retry update if possible)"));
> }
> }
> }
> /** */
> private CacheConfiguration<Integer, Integer>
> createTestCacheConfiguration() {
> return new CacheConfiguration<Integer, Integer>()
> .setName(DEFAULT_CACHE_NAME)
> .setAtomicityMode(ATOMIC)
> .setWriteSynchronizationMode(FULL_SYNC)
> .setBackups(2)
> .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
> .setInterceptor(new TestInterceptor());
> }
> /** */
> private Map<Integer, Integer> createKeysForNode(int nodeIdx) {
> Map<Integer, Integer> data = new TreeMap<>();
> if (nodeIdx == 2) {
> data.put(NODE_2_FIRST_KEY, 2);
> data.put(NODE_2_SECOND_KEY, 5);
> }
> else {
> data.put(NODE_1_FIRST_KEY, 1);
> data.put(NODE_1_SECOND_KEY, 4);
> }
> return data;
> }
> /** */
> public static final class TestInterceptor implements
> CacheInterceptor<Integer, Integer> {
> /** */
> public static CountDownLatch putStartedLatch;
> /** */
> public static CountDownLatch putUnblockedLatch;
> /** {@inheritDoc} */
> @Override public @Nullable Integer onGet(Integer key, @Nullable
> Integer val) {
> return val;
> }
> /** {@inheritDoc} */
> @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer,
> Integer> entry, Integer newVal) {
> if (entry.getKey() == NODE_1_FIRST_KEY || entry.getKey() ==
> NODE_2_FIRST_KEY) {
> putStartedLatch.countDown();
> try {
> assertTrue(putUnblockedLatch.await(10000, MILLISECONDS));
> }
> catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> throw new IgniteException(e);
> }
> }
> else
> throw new IgniteException("expected");
> return newVal;
> }
> /** {@inheritDoc} */
> @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry)
> {
> // No-op.
> }
> /** {@inheritDoc} */
> @Override public @Nullable IgniteBiTuple<Boolean, Integer>
> onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
> return null;
> }
> /** {@inheritDoc} */
> @Override public void onAfterRemove(Cache.Entry<Integer, Integer>
> entry) {
> // No-op.
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)