This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5acaf  IGNITE-13074 Thin client: Fix default compute cluster group - 
Fixes #7850.
ee5acaf is described below

commit ee5acaf0a3a4fd363e7da2d04a0ff1860a2254c6
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu May 28 12:42:02 2020 +0300

    IGNITE-13074 Thin client: Fix default compute cluster group - Fixes #7850.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../client/thin/ClientClusterGroupImpl.java        | 14 +++++++++---
 .../internal/client/thin/ClientClusterImpl.java    | 12 ++++++++++
 .../internal/client/thin/ClientComputeImpl.java    | 22 +++++++++---------
 .../internal/client/thin/TcpIgniteClient.java      |  2 +-
 .../platform/client/compute/ClientComputeTask.java |  4 +++-
 .../internal/client/thin/ClusterGroupTest.java     |  8 +++----
 .../internal/client/thin/ComputeTaskTest.java      | 26 +++++++++++++++++-----
 7 files changed, 63 insertions(+), 25 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
index 3425dc3..fd26025 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
@@ -157,7 +157,8 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
 
     /** {@inheritDoc} */
     @Override public ClientClusterGroup forServers() {
-        return forProjectionFilters(projectionFilters.forNodeType(true));
+        return forProjectionFilters(projectionFilters == 
ProjectionFilters.FULL_PROJECTION ?
+            ProjectionFilters.DEFAULT_PROJECTION : 
projectionFilters.forNodeType(true));
     }
 
     /** {@inheritDoc} */
@@ -263,10 +264,10 @@ class ClientClusterGroupImpl implements 
ClientClusterGroup {
      *
      * Note: This method is for internal use only. For optimization purposes 
it can return not existing node IDs if
      * only filter by node IDs was explicitly set.
-     * Method also returns null if no filter was set (full projection needed).
+     * Method also returns null for default projection (for server nodes).
      */
     public Collection<UUID> nodeIds() {
-        if (projectionFilters == ProjectionFilters.FULL_PROJECTION)
+        if (projectionFilters == ProjectionFilters.DEFAULT_PROJECTION)
             return null;
         else if (projectionFilters.hasOnlyNodeIdsFilters())
             return 
Collections.unmodifiableCollection(projectionFilters.nodeIds);
@@ -483,6 +484,13 @@ class ClientClusterGroupImpl implements ClientClusterGroup 
{
         /** Projection without any filters for full set of nodes. */
         public static final ProjectionFilters FULL_PROJECTION = new 
ProjectionFilters();
 
+        /**
+         * Projection for server nodes, used by default for compute and 
service operations when cluster group is not
+         * defined explicitly.
+         */
+        public static final ProjectionFilters DEFAULT_PROJECTION =
+            new ProjectionFilters(null, null, Boolean.TRUE, null, null);
+
         /** Filter for empty projection. Will be returned if there are 
mutually exclusive filters detected. */
         public static final ProjectionFilters EMPTY_PROJECTION =
             new ProjectionFilters(Collections.emptySet(), null, null, null, 
null);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
index a7c9ebc..dc35142ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
@@ -28,11 +28,16 @@ import org.apache.ignite.internal.binary.BinaryWriterExImpl;
  * Implementation of {@link ClientCluster}.
  */
 class ClientClusterImpl extends ClientClusterGroupImpl implements 
ClientCluster {
+    /** Default cluster group. */
+    private final ClientClusterGroupImpl dfltClusterGrp;
+
     /**
      * Constructor.
      */
     ClientClusterImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
         super(ch, marsh);
+
+        dfltClusterGrp = (ClientClusterGroupImpl)forServers();
     }
 
     /** {@inheritDoc} */
@@ -135,4 +140,11 @@ class ClientClusterImpl extends ClientClusterGroupImpl 
implements ClientCluster
             
!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.CLUSTER_STATES))
             throw new 
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.CLUSTER_STATES);
     }
+
+    /**
+     * Default cluster group ("for servers").
+     */
+    ClientClusterGroupImpl defaultClusterGroup() {
+        return dfltClusterGrp;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index 287aba3..6238e52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -64,8 +64,8 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
     /** Utils for serialization/deserialization. */
     private final ClientUtils utils;
 
-    /** ClientCluster instance. */
-    private final ClientClusterImpl cluster;
+    /** Default cluster group. */
+    private final ClientClusterGroupImpl dfltGrp;
 
     /** Active tasks. */
     private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>> 
activeTasks = new ConcurrentHashMap<>();
@@ -74,10 +74,10 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
     private final ReadWriteLock guard = new ReentrantReadWriteLock();
 
     /** Constructor. */
-    ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, 
ClientClusterImpl cluster) {
+    ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, 
ClientClusterGroupImpl dfltGrp) {
         this.ch = ch;
         this.marsh = marsh;
-        this.cluster = cluster;
+        this.dfltGrp = dfltGrp;
 
         utils = new ClientUtils(marsh);
 
@@ -102,32 +102,32 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
 
     /** {@inheritDoc} */
     @Override public ClientClusterGroup clusterGroup() {
-        return cluster;
+        return dfltGrp;
     }
 
     /** {@inheritDoc} */
     @Override public <T, R> R execute(String taskName, @Nullable T arg) throws 
ClientException, InterruptedException {
-        return execute0(taskName, arg, cluster, (byte)0, 0L);
+        return execute0(taskName, arg, dfltGrp, (byte)0, 0L);
     }
 
     /** {@inheritDoc} */
     @Override public <T, R> Future<R> executeAsync(String taskName, @Nullable 
T arg) throws ClientException {
-        return executeAsync0(taskName, arg, cluster, (byte)0, 0L);
+        return executeAsync0(taskName, arg, dfltGrp, (byte)0, 0L);
     }
 
     /** {@inheritDoc} */
     @Override public ClientCompute withTimeout(long timeout) {
-        return timeout == 0L ? this : new ClientComputeModificator(this, 
cluster, (byte)0, timeout);
+        return timeout == 0L ? this : new ClientComputeModificator(this, 
dfltGrp, (byte)0, timeout);
     }
 
     /** {@inheritDoc} */
     @Override public ClientCompute withNoFailover() {
-        return new ClientComputeModificator(this, cluster, 
NO_FAILOVER_FLAG_MASK, 0L);
+        return new ClientComputeModificator(this, dfltGrp, 
NO_FAILOVER_FLAG_MASK, 0L);
     }
 
     /** {@inheritDoc} */
     @Override public ClientCompute withNoResultCache() {
-        return new ClientComputeModificator(this, cluster, 
NO_RESULT_CACHE_FLAG_MASK, 0L);
+        return new ClientComputeModificator(this, dfltGrp, 
NO_RESULT_CACHE_FLAG_MASK, 0L);
     }
 
     /**
@@ -178,7 +178,7 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
         byte flags,
         long timeout
     ) throws ClientException {
-        Collection<UUID> nodeIds = clusterGrp == cluster ? null : 
clusterGrp.nodeIds();
+        Collection<UUID> nodeIds = clusterGrp.nodeIds();
 
         if (F.isEmpty(taskName))
             throw new ClientException("Task name can't be null or empty.");
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 955f764..e4c6b9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -114,7 +114,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         cluster = new ClientClusterImpl(ch, marsh);
 
-        compute = new ClientComputeImpl(ch, marsh, cluster);
+        compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
index a099369..4ce3907 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -97,7 +97,9 @@ class ClientComputeTask implements ClientCloseableResource {
 
         GridTaskProcessor task = ctx.kernalContext().task();
 
-        IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? 
F.alwaysTrue() : F.nodeForNodeIds(nodeIds);
+        IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? node 
-> !node.isClient() :
+            F.nodeForNodeIds(nodeIds);
+
         UUID subjId = ctx.securityContext() == null ? null : 
ctx.securityContext().subject().id();
 
         task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
index 5023e85..e253fab 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
@@ -331,8 +331,8 @@ public class ClusterGroupTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testNodeIds() {
-        // Check full projection.
-        assertNull(((ClientClusterGroupImpl)client.cluster()).nodeIds());
+        // Check default projection.
+        
assertNull(((ClientClusterImpl)client.cluster()).defaultClusterGroup().nodeIds());
 
         // Check filter only by node id.
         Collection<UUID> nodeIds = 
((ClientClusterGroupImpl)client.cluster().forNodeId(nodeId(0),
@@ -345,9 +345,9 @@ public class ClusterGroupTest extends 
GridCommonAbstractTest {
         assertFalse(nodeIds.contains(nodeId(4)));
 
         // Check server-side filter.
-        nodeIds = 
((ClientClusterGroupImpl)client.cluster().forServers()).nodeIds();
+        nodeIds = 
((ClientClusterGroupImpl)client.cluster().forClients()).nodeIds();
 
-        assertEquals(new HashSet<>(F.asList(nodeId(0), nodeId(1), nodeId(2))),
+        assertEquals(new HashSet<>(F.asList(nodeId(3), nodeId(4))),
             new HashSet<>(nodeIds));
 
         // Check client-side filters.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 2f16208..d400daf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -80,7 +80,8 @@ public class ComputeTaskTest extends GridCommonAbstractTest {
         return 
super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
             new ClientConnectorConfiguration().setThinClientConfiguration(
                 new 
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(
-                    getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ? 
ACTIVE_TASKS_LIMIT : 0)));
+                    getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ? 
ACTIVE_TASKS_LIMIT : 0)))
+            .setClientMode(getTestIgniteInstanceIndex(igniteInstanceName) == 
3);
     }
 
     /**
@@ -118,7 +119,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
             T2<UUID, Set<UUID>> val = 
client.compute().execute(TestTask.class.getName(), null);
 
             assertEquals(nodeId(0), val.get1());
-            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), 
val.get2());
+            assertEquals(new 
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
         }
     }
 
@@ -145,7 +146,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
             T2<UUID, Set<UUID>> val = client.compute().execute(TEST_TASK_NAME, 
null);
 
             assertEquals(nodeId(0), val.get1());
-            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), 
val.get2());
+            assertEquals(new 
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
         }
     }
 
@@ -174,7 +175,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
 
             assertTrue(fut.isDone());
             assertEquals(nodeId(0), val.get1());
-            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), 
val.get2());
+            assertEquals(new 
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
         }
     }
 
@@ -250,10 +251,25 @@ public class ComputeTaskTest extends 
GridCommonAbstractTest {
 
             assertEquals(nodeId(0), val.get1());
             assertEquals(nodeIds(1, 2), val.get2());
+
+            // Compute on client node defined explicitly.
+            grp = client.cluster().forNodeIds(nodeIds(3));
+
+            val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(nodeIds(3), val.get2());
+
+            // Compute on all nodes (clients + servers).
+            grp = client.cluster();
+
+            val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), 
val.get2());
         }
     }
 
-
     /**
      *
      */

Reply via email to