This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dcd4978379 IGNITE-20038 [Thin Clients] Fixed broken 
ClientCachePartitionAwarenessGroup serialization (#10854)
7dcd4978379 is described below

commit 7dcd497837942df61137b07fd2ae0400da01189f
Author: Mikhail Petrov <[email protected]>
AuthorDate: Wed Jul 26 20:26:13 2023 +0300

    IGNITE-20038 [Thin Clients] Fixed broken ClientCachePartitionAwarenessGroup 
serialization (#10854)
---
 .../client/thin/ClientCacheAffinityContext.java    |  2 +-
 .../cache/ClientCachePartitionAwarenessGroup.java  | 64 +++++++++++++---------
 ...ClientPartitionAwarenessStableTopologyTest.java | 36 ++++++++++++
 3 files changed, 74 insertions(+), 28 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 465f07500c4..577a2c5f476 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -60,7 +60,7 @@ public class ClientCacheAffinityContext {
     private final AtomicReference<TopologyNodes> lastTop = new 
AtomicReference<>();
 
     /** Cache IDs, which should be included to the next affinity mapping 
request. */
-    private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+    final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
 
     /** Current affinity mapping. */
     private volatile ClientCacheAffinityMapping affinityMapping;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 2cd20c97ea1..a5bc57c8c26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -61,41 +61,28 @@ class ClientCachePartitionAwarenessGroup {
      * @param cpctx Protocol context.
      */
     public void write(CacheObjectBinaryProcessorImpl proc, BinaryRawWriter 
writer, ClientProtocolContext cpctx) {
-        writer.writeBoolean(mapping != null);
+        boolean isPartitionAwarenessApplicable = mapping != null;
 
-        writer.writeInt(cacheCfgs.size());
-
-        for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry: 
cacheCfgs.entrySet()) {
-            writer.writeInt(entry.getKey());
-
-            if (mapping == null)
-                continue;
-
-            CacheConfiguration<?, ?> ccfg = entry.getValue();
-            CacheKeyConfiguration[] keyCfgs = ccfg.getKeyConfiguration();
-
-            if (keyCfgs == null) {
-                writer.writeInt(0);
+        writer.writeBoolean(isPartitionAwarenessApplicable);
 
-                continue;
-            }
-
-            writer.writeInt(keyCfgs.length);
+        writer.writeInt(cacheCfgs.size());
 
-            for (CacheKeyConfiguration keyCfg : keyCfgs) {
-                int keyTypeId = proc.typeId(keyCfg.getTypeName());
-                int affinityKeyFieldId = 
proc.binaryContext().fieldId(keyTypeId, keyCfg.getAffinityKeyFieldName());
+        if (isPartitionAwarenessApplicable) {
+            for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry: 
cacheCfgs.entrySet()) {
+                writer.writeInt(entry.getKey());
 
-                writer.writeInt(keyTypeId);
-                writer.writeInt(affinityKeyFieldId);
+                writeCacheKeyConfiguration(writer, proc, 
entry.getValue().getKeyConfiguration());
             }
-        }
 
-        if (mapping != null)
             mapping.write(writer);
 
-        if 
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
-            writer.writeBoolean(dfltAffinity);
+            if 
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+                writer.writeBoolean(dfltAffinity);
+        }
+        else {
+            for (int cacheId : cacheCfgs.keySet())
+                writer.writeInt(cacheId);
+        }
     }
 
     /**
@@ -107,6 +94,29 @@ class ClientCachePartitionAwarenessGroup {
             cacheCfgs.putIfAbsent(desc.cacheId(), desc.cacheConfiguration());
     }
 
+    /** */
+    private static void writeCacheKeyConfiguration(
+        BinaryRawWriter writer,
+        CacheObjectBinaryProcessorImpl binProc,
+        CacheKeyConfiguration[] keyCfgs
+    ) {
+        if (keyCfgs == null) {
+            writer.writeInt(0);
+
+            return;
+        }
+
+        writer.writeInt(keyCfgs.length);
+
+        for (CacheKeyConfiguration keyCfg : keyCfgs) {
+            int keyTypeId = binProc.typeId(keyCfg.getTypeName());
+            int affinityKeyFieldId = 
binProc.binaryContext().fieldId(keyTypeId, keyCfg.getAffinityKeyFieldName());
+
+            writer.writeInt(keyTypeId);
+            writer.writeInt(affinityKeyFieldId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 087e1c6a3ba..5e66be9f6c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -34,10 +36,17 @@ import 
org.apache.ignite.client.ClientPartitionAwarenessMapper;
 import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static java.util.Arrays.asList;
+
 /**
  * Test partition awareness of thin client on stable topology.
  */
@@ -227,6 +236,33 @@ public class 
ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
         testIgniteSet("testIgniteSet4", "grp-testIgniteSet4", 
CacheAtomicityMode.TRANSACTIONAL);
     }
 
+    /** */
+    @Test
+    public void testMultipleCacheGroupAffinityMappingRequest() throws 
Exception {
+        ClientCacheAffinityContext affCtx = 
((TcpIgniteClient)client).reliableChannel().affinityContext();
+
+        IgniteInternalFuture<Object> replCacheOpFut;
+        IgniteInternalFuture<Object> partCacheOpFut;
+
+        synchronized (affCtx.cacheKeyMapperFactoryMap) {
+            partCacheOpFut = GridTestUtils.runAsync(() -> 
client.cache(PART_CACHE_NAME).get(0));
+            replCacheOpFut = GridTestUtils.runAsync(() -> 
client.cache(REPL_CACHE_NAME).get(0));
+
+            GridTestUtils.waitForCondition(
+                () -> 
affCtx.pendingCacheIds.containsAll(F.transform(asList(REPL_CACHE_NAME, 
PART_CACHE_NAME), CU::cacheId)),
+                getTestTimeout()
+            );
+        }
+
+        partCacheOpFut.get();
+        replCacheOpFut.get();
+
+        Map<ClientOperation, Integer> ops = 
opsQueue.stream().map(T2::get2).collect(Collectors.toMap(v -> v, v -> 1, 
Integer::sum));
+
+        assertEquals(2, (int)ops.get(ClientOperation.CACHE_GET));
+        assertEquals(1, (int)ops.get(ClientOperation.CACHE_PARTITIONS));
+    }
+
     /**
      * Tests {@link ClientIgniteSet} partition awareness.
      */

Reply via email to