This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7dcd4978379 IGNITE-20038 [Thin Clients] Fixed broken
ClientCachePartitionAwarenessGroup serialization (#10854)
7dcd4978379 is described below
commit 7dcd497837942df61137b07fd2ae0400da01189f
Author: Mikhail Petrov <[email protected]>
AuthorDate: Wed Jul 26 20:26:13 2023 +0300
IGNITE-20038 [Thin Clients] Fixed broken ClientCachePartitionAwarenessGroup
serialization (#10854)
---
.../client/thin/ClientCacheAffinityContext.java | 2 +-
.../cache/ClientCachePartitionAwarenessGroup.java | 64 +++++++++++++---------
...ClientPartitionAwarenessStableTopologyTest.java | 36 ++++++++++++
3 files changed, 74 insertions(+), 28 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 465f07500c4..577a2c5f476 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -60,7 +60,7 @@ public class ClientCacheAffinityContext {
private final AtomicReference<TopologyNodes> lastTop = new
AtomicReference<>();
/** Cache IDs, which should be included to the next affinity mapping
request. */
- private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+ final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
/** Current affinity mapping. */
private volatile ClientCacheAffinityMapping affinityMapping;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 2cd20c97ea1..a5bc57c8c26 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -61,41 +61,28 @@ class ClientCachePartitionAwarenessGroup {
* @param cpctx Protocol context.
*/
public void write(CacheObjectBinaryProcessorImpl proc, BinaryRawWriter
writer, ClientProtocolContext cpctx) {
- writer.writeBoolean(mapping != null);
+ boolean isPartitionAwarenessApplicable = mapping != null;
- writer.writeInt(cacheCfgs.size());
-
- for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry:
cacheCfgs.entrySet()) {
- writer.writeInt(entry.getKey());
-
- if (mapping == null)
- continue;
-
- CacheConfiguration<?, ?> ccfg = entry.getValue();
- CacheKeyConfiguration[] keyCfgs = ccfg.getKeyConfiguration();
-
- if (keyCfgs == null) {
- writer.writeInt(0);
+ writer.writeBoolean(isPartitionAwarenessApplicable);
- continue;
- }
-
- writer.writeInt(keyCfgs.length);
+ writer.writeInt(cacheCfgs.size());
- for (CacheKeyConfiguration keyCfg : keyCfgs) {
- int keyTypeId = proc.typeId(keyCfg.getTypeName());
- int affinityKeyFieldId =
proc.binaryContext().fieldId(keyTypeId, keyCfg.getAffinityKeyFieldName());
+ if (isPartitionAwarenessApplicable) {
+ for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry:
cacheCfgs.entrySet()) {
+ writer.writeInt(entry.getKey());
- writer.writeInt(keyTypeId);
- writer.writeInt(affinityKeyFieldId);
+ writeCacheKeyConfiguration(writer, proc,
entry.getValue().getKeyConfiguration());
}
- }
- if (mapping != null)
mapping.write(writer);
- if
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
- writer.writeBoolean(dfltAffinity);
+ if
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+ writer.writeBoolean(dfltAffinity);
+ }
+ else {
+ for (int cacheId : cacheCfgs.keySet())
+ writer.writeInt(cacheId);
+ }
}
/**
@@ -107,6 +94,29 @@ class ClientCachePartitionAwarenessGroup {
cacheCfgs.putIfAbsent(desc.cacheId(), desc.cacheConfiguration());
}
+ /** */
+ private static void writeCacheKeyConfiguration(
+ BinaryRawWriter writer,
+ CacheObjectBinaryProcessorImpl binProc,
+ CacheKeyConfiguration[] keyCfgs
+ ) {
+ if (keyCfgs == null) {
+ writer.writeInt(0);
+
+ return;
+ }
+
+ writer.writeInt(keyCfgs.length);
+
+ for (CacheKeyConfiguration keyCfg : keyCfgs) {
+ int keyTypeId = binProc.typeId(keyCfg.getTypeName());
+ int affinityKeyFieldId =
binProc.binaryContext().fieldId(keyTypeId, keyCfg.getAffinityKeyFieldName());
+
+ writer.writeInt(keyTypeId);
+ writer.writeInt(affinityKeyFieldId);
+ }
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 087e1c6a3ba..5e66be9f6c5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.client.thin;
import java.util.Arrays;
+import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
@@ -34,10 +36,17 @@ import
org.apache.ignite.client.ClientPartitionAwarenessMapper;
import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import
org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static java.util.Arrays.asList;
+
/**
* Test partition awareness of thin client on stable topology.
*/
@@ -227,6 +236,33 @@ public class
ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
testIgniteSet("testIgniteSet4", "grp-testIgniteSet4",
CacheAtomicityMode.TRANSACTIONAL);
}
+ /** */
+ @Test
+ public void testMultipleCacheGroupAffinityMappingRequest() throws
Exception {
+ ClientCacheAffinityContext affCtx =
((TcpIgniteClient)client).reliableChannel().affinityContext();
+
+ IgniteInternalFuture<Object> replCacheOpFut;
+ IgniteInternalFuture<Object> partCacheOpFut;
+
+ synchronized (affCtx.cacheKeyMapperFactoryMap) {
+ partCacheOpFut = GridTestUtils.runAsync(() ->
client.cache(PART_CACHE_NAME).get(0));
+ replCacheOpFut = GridTestUtils.runAsync(() ->
client.cache(REPL_CACHE_NAME).get(0));
+
+ GridTestUtils.waitForCondition(
+ () ->
affCtx.pendingCacheIds.containsAll(F.transform(asList(REPL_CACHE_NAME,
PART_CACHE_NAME), CU::cacheId)),
+ getTestTimeout()
+ );
+ }
+
+ partCacheOpFut.get();
+ replCacheOpFut.get();
+
+ Map<ClientOperation, Integer> ops =
opsQueue.stream().map(T2::get2).collect(Collectors.toMap(v -> v, v -> 1,
Integer::sum));
+
+ assertEquals(2, (int)ops.get(ClientOperation.CACHE_GET));
+ assertEquals(1, (int)ops.get(ClientOperation.CACHE_PARTITIONS));
+ }
+
/**
* Tests {@link ClientIgniteSet} partition awareness.
*/