Sergey Kosarev created IGNITE-18976: ---------------------------------------
Summary: Affinity broken on the client after reconnection Key: IGNITE-18976 URL: https://issues.apache.org/jira/browse/IGNITE-18976 Project: Ignite Issue Type: Bug Components: binary Affects Versions: 2.16 Reporter: Sergey Kosarev /* * Copyright 2019 GridGain Systems, Inc. and Contributors. * * Licensed under the GridGain Community Edition License (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.ignite.internal; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryTypeConfiguration; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; /** * */ public class IgniteClientReconnectAffinityTest extends IgniteClientReconnectAbstractTest { /** */ private static final int SRV_CNT = 1; /** */ private UUID nodeId; private Ignite client; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); TestCommunicationSpi commSpi = new TestCommunicationSpi(); commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); cfg.setPeerClassLoadingEnabled(false); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000); cfg.setCacheKeyConfiguration(new CacheKeyConfiguration(TestNotAnnotatedKey.class.getName(), TestNotAnnotatedKey.AFFINITY_KEY_FIELD)) .setBinaryConfiguration( new BinaryConfiguration() .setTypeConfigurations(Arrays.asList( new BinaryTypeConfiguration() .setTypeName(TestNotAnnotatedKey.class.getName()), new BinaryTypeConfiguration() .setTypeName(TestAnnotatedKey.class.getName()) )) ) ; return cfg; } /** {@inheritDoc} */ @Override protected int serverCount() { return 0; } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { startGrids(SRV_CNT); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); } @Test public void testReconnectClientNotAnnotatedAffinityKeyGet() throws Exception { clientMode = true; final Ignite client = startGrid(SRV_CNT); assertTrue(client.cluster().localNode().isClient()); final Ignite srv = clientRouter(client); final IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) ); final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); assertNotNull(srvCache); final String val = "val"; clientCache.put(TestNotAnnotatedKey.of(1), val); assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1))); assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1))); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { assertNotNull(srvCache.get(TestNotAnnotatedKey.of(1))); } }); assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1))); assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1))); } @Test public void testReconnectClientNotAnnotatedAffinityKeyPartition() throws Exception { clientMode = true; client = startGrid(SRV_CNT); assertTrue(client.cluster().localNode().isClient()); final Ignite srv = clientRouter(client); IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME) // .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) ); final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); assertNotNull(srvCache); int partition = partition(TestNotAnnotatedKey.of(1), client); assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv)); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv)); } }); assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv)); assertEquals(partition, partition(TestNotAnnotatedKey.of(1), client)); } @Test public void testReconnectClientAnnotatedAffinityKeyGet() throws Exception { clientMode = true; final Ignite client = startGrid(SRV_CNT); assertTrue(client.cluster().localNode().isClient()); final Ignite srv = clientRouter(client); final IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) ); final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); assertNotNull(srvCache); final String val = "val"; clientCache.put(TestAnnotatedKey.of(1), val); assertEquals(val, clientCache.get(TestAnnotatedKey.of(1))); assertEquals(val, srvCache.get(TestAnnotatedKey.of(1))); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { assertNotNull(srvCache.get(TestAnnotatedKey.of(1))); } }); assertEquals(val, srvCache.get(TestAnnotatedKey.of(1))); assertEquals(val, clientCache.get(TestAnnotatedKey.of(1))); } @Test public void testReconnectClientAnnotatedAffinityKeyPartition() throws Exception { clientMode = true; client = startGrid(SRV_CNT); assertTrue(client.cluster().localNode().isClient()); final Ignite srv = clientRouter(client); IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME) // .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) ); final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); assertNotNull(srvCache); int partition = partition(TestAnnotatedKey.of(1), client); assertEquals(partition, partition(TestAnnotatedKey.of(1), srv)); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { assertEquals(partition, partition(TestAnnotatedKey.of(1), srv)); } }); assertEquals(partition, partition(TestAnnotatedKey.of(1), srv)); assertEquals(partition, partition(TestAnnotatedKey.of(1), client)); } private <K> int partition(K key, Ignite ign) { return ign.affinity(DEFAULT_CACHE_NAME).partition(key); } static class TestNotAnnotatedKey { private static final String AFFINITY_KEY_FIELD = "affinityKey"; private int notAnnotatedAffinityKey; private int nonAffinityInfo; public TestNotAnnotatedKey(int affinityKey, int nonAffinityInfo) { this.notAnnotatedAffinityKey = affinityKey; this.nonAffinityInfo = nonAffinityInfo; } public static TestNotAnnotatedKey of(int affinityKey) { return new TestNotAnnotatedKey(affinityKey, affinityKey); } } static class TestAnnotatedKey { @AffinityKeyMapped private int annotatedAffinityKey; private int nonAffinityInfo; public TestAnnotatedKey(int affinityKey, int nonAffinityInfo) { this.annotatedAffinityKey = affinityKey; this.nonAffinityInfo = nonAffinityInfo; } public static TestAnnotatedKey of(int affinityKey) { return new TestAnnotatedKey(affinityKey, affinityKey); } } /** * */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** */ @LoggerResource private IgniteLogger log; /** */ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); /** */ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); synchronized (this) { Set<UUID> blockNodes = blockCls.get(msg0.getClass()); if (F.contains(blockNodes, node.id())) { log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg0 + ']'); blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); return; } } } super.sendMessage(node, msg, ackClosure); } /** * @param cls Message class. * @param nodeId Node ID. */ void blockMessages(Class<?> cls, UUID nodeId) { synchronized (this) { Set<UUID> set = blockCls.get(cls); if (set == null) { set = new HashSet<>(); blockCls.put(cls, set); } set.add(nodeId); } } /** * @param snd Send messages flag. */ void stopBlock(boolean snd) { synchronized (this) { blockCls.clear(); if (snd) { for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { ClusterNode node = msg.get1(); log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg.get2().message() + ']'); super.sendMessage(msg.get1(), msg.get2()); } } blockedMsgs.clear(); } } } } -- This message was sent by Atlassian Jira (v8.20.10#820010)