This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ee5acaf IGNITE-13074 Thin client: Fix default compute cluster group -
Fixes #7850.
ee5acaf is described below
commit ee5acaf0a3a4fd363e7da2d04a0ff1860a2254c6
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu May 28 12:42:02 2020 +0300
IGNITE-13074 Thin client: Fix default compute cluster group - Fixes #7850.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../client/thin/ClientClusterGroupImpl.java | 14 +++++++++---
.../internal/client/thin/ClientClusterImpl.java | 12 ++++++++++
.../internal/client/thin/ClientComputeImpl.java | 22 +++++++++---------
.../internal/client/thin/TcpIgniteClient.java | 2 +-
.../platform/client/compute/ClientComputeTask.java | 4 +++-
.../internal/client/thin/ClusterGroupTest.java | 8 +++----
.../internal/client/thin/ComputeTaskTest.java | 26 +++++++++++++++++-----
7 files changed, 63 insertions(+), 25 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
index 3425dc3..fd26025 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
@@ -157,7 +157,8 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
/** {@inheritDoc} */
@Override public ClientClusterGroup forServers() {
- return forProjectionFilters(projectionFilters.forNodeType(true));
+ return forProjectionFilters(projectionFilters ==
ProjectionFilters.FULL_PROJECTION ?
+ ProjectionFilters.DEFAULT_PROJECTION :
projectionFilters.forNodeType(true));
}
/** {@inheritDoc} */
@@ -263,10 +264,10 @@ class ClientClusterGroupImpl implements
ClientClusterGroup {
*
* Note: This method is for internal use only. For optimization purposes
it can return not existing node IDs if
* only filter by node IDs was explicitly set.
- * Method also returns null if no filter was set (full projection needed).
+ * Method also returns null for default projection (for server nodes).
*/
public Collection<UUID> nodeIds() {
- if (projectionFilters == ProjectionFilters.FULL_PROJECTION)
+ if (projectionFilters == ProjectionFilters.DEFAULT_PROJECTION)
return null;
else if (projectionFilters.hasOnlyNodeIdsFilters())
return
Collections.unmodifiableCollection(projectionFilters.nodeIds);
@@ -483,6 +484,13 @@ class ClientClusterGroupImpl implements ClientClusterGroup
{
/** Projection without any filters for full set of nodes. */
public static final ProjectionFilters FULL_PROJECTION = new
ProjectionFilters();
+ /**
+ * Projection for server nodes, used by default for compute and
service operations when cluster group is not
+ * defined explicitly.
+ */
+ public static final ProjectionFilters DEFAULT_PROJECTION =
+ new ProjectionFilters(null, null, Boolean.TRUE, null, null);
+
/** Filter for empty projection. Will be returned if there are
mutually exclusive filters detected. */
public static final ProjectionFilters EMPTY_PROJECTION =
new ProjectionFilters(Collections.emptySet(), null, null, null,
null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
index a7c9ebc..dc35142ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
@@ -28,11 +28,16 @@ import org.apache.ignite.internal.binary.BinaryWriterExImpl;
* Implementation of {@link ClientCluster}.
*/
class ClientClusterImpl extends ClientClusterGroupImpl implements
ClientCluster {
+ /** Default cluster group. */
+ private final ClientClusterGroupImpl dfltClusterGrp;
+
/**
* Constructor.
*/
ClientClusterImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
super(ch, marsh);
+
+ dfltClusterGrp = (ClientClusterGroupImpl)forServers();
}
/** {@inheritDoc} */
@@ -135,4 +140,11 @@ class ClientClusterImpl extends ClientClusterGroupImpl
implements ClientCluster
!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.CLUSTER_STATES))
throw new
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.CLUSTER_STATES);
}
+
+ /**
+ * Default cluster group ("for servers").
+ */
+ ClientClusterGroupImpl defaultClusterGroup() {
+ return dfltClusterGrp;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index 287aba3..6238e52 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -64,8 +64,8 @@ class ClientComputeImpl implements ClientCompute,
NotificationListener {
/** Utils for serialization/deserialization. */
private final ClientUtils utils;
- /** ClientCluster instance. */
- private final ClientClusterImpl cluster;
+ /** Default cluster group. */
+ private final ClientClusterGroupImpl dfltGrp;
/** Active tasks. */
private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>>
activeTasks = new ConcurrentHashMap<>();
@@ -74,10 +74,10 @@ class ClientComputeImpl implements ClientCompute,
NotificationListener {
private final ReadWriteLock guard = new ReentrantReadWriteLock();
/** Constructor. */
- ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterImpl cluster) {
+ ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl dfltGrp) {
this.ch = ch;
this.marsh = marsh;
- this.cluster = cluster;
+ this.dfltGrp = dfltGrp;
utils = new ClientUtils(marsh);
@@ -102,32 +102,32 @@ class ClientComputeImpl implements ClientCompute,
NotificationListener {
/** {@inheritDoc} */
@Override public ClientClusterGroup clusterGroup() {
- return cluster;
+ return dfltGrp;
}
/** {@inheritDoc} */
@Override public <T, R> R execute(String taskName, @Nullable T arg) throws
ClientException, InterruptedException {
- return execute0(taskName, arg, cluster, (byte)0, 0L);
+ return execute0(taskName, arg, dfltGrp, (byte)0, 0L);
}
/** {@inheritDoc} */
@Override public <T, R> Future<R> executeAsync(String taskName, @Nullable
T arg) throws ClientException {
- return executeAsync0(taskName, arg, cluster, (byte)0, 0L);
+ return executeAsync0(taskName, arg, dfltGrp, (byte)0, 0L);
}
/** {@inheritDoc} */
@Override public ClientCompute withTimeout(long timeout) {
- return timeout == 0L ? this : new ClientComputeModificator(this,
cluster, (byte)0, timeout);
+ return timeout == 0L ? this : new ClientComputeModificator(this,
dfltGrp, (byte)0, timeout);
}
/** {@inheritDoc} */
@Override public ClientCompute withNoFailover() {
- return new ClientComputeModificator(this, cluster,
NO_FAILOVER_FLAG_MASK, 0L);
+ return new ClientComputeModificator(this, dfltGrp,
NO_FAILOVER_FLAG_MASK, 0L);
}
/** {@inheritDoc} */
@Override public ClientCompute withNoResultCache() {
- return new ClientComputeModificator(this, cluster,
NO_RESULT_CACHE_FLAG_MASK, 0L);
+ return new ClientComputeModificator(this, dfltGrp,
NO_RESULT_CACHE_FLAG_MASK, 0L);
}
/**
@@ -178,7 +178,7 @@ class ClientComputeImpl implements ClientCompute,
NotificationListener {
byte flags,
long timeout
) throws ClientException {
- Collection<UUID> nodeIds = clusterGrp == cluster ? null :
clusterGrp.nodeIds();
+ Collection<UUID> nodeIds = clusterGrp.nodeIds();
if (F.isEmpty(taskName))
throw new ClientException("Task name can't be null or empty.");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 955f764..e4c6b9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -114,7 +114,7 @@ public class TcpIgniteClient implements IgniteClient {
cluster = new ClientClusterImpl(ch, marsh);
- compute = new ClientComputeImpl(ch, marsh, cluster);
+ compute = new ClientComputeImpl(ch, marsh,
cluster.defaultClusterGroup());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
index a099369..4ce3907 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -97,7 +97,9 @@ class ClientComputeTask implements ClientCloseableResource {
GridTaskProcessor task = ctx.kernalContext().task();
- IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ?
F.alwaysTrue() : F.nodeForNodeIds(nodeIds);
+ IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? node
-> !node.isClient() :
+ F.nodeForNodeIds(nodeIds);
+
UUID subjId = ctx.securityContext() == null ? null :
ctx.securityContext().subject().id();
task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
index 5023e85..e253fab 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ClusterGroupTest.java
@@ -331,8 +331,8 @@ public class ClusterGroupTest extends
GridCommonAbstractTest {
*/
@Test
public void testNodeIds() {
- // Check full projection.
- assertNull(((ClientClusterGroupImpl)client.cluster()).nodeIds());
+ // Check default projection.
+
assertNull(((ClientClusterImpl)client.cluster()).defaultClusterGroup().nodeIds());
// Check filter only by node id.
Collection<UUID> nodeIds =
((ClientClusterGroupImpl)client.cluster().forNodeId(nodeId(0),
@@ -345,9 +345,9 @@ public class ClusterGroupTest extends
GridCommonAbstractTest {
assertFalse(nodeIds.contains(nodeId(4)));
// Check server-side filter.
- nodeIds =
((ClientClusterGroupImpl)client.cluster().forServers()).nodeIds();
+ nodeIds =
((ClientClusterGroupImpl)client.cluster().forClients()).nodeIds();
- assertEquals(new HashSet<>(F.asList(nodeId(0), nodeId(1), nodeId(2))),
+ assertEquals(new HashSet<>(F.asList(nodeId(3), nodeId(4))),
new HashSet<>(nodeIds));
// Check client-side filters.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 2f16208..d400daf 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -80,7 +80,8 @@ public class ComputeTaskTest extends GridCommonAbstractTest {
return
super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
new ClientConnectorConfiguration().setThinClientConfiguration(
new
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(
- getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ?
ACTIVE_TASKS_LIMIT : 0)));
+ getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ?
ACTIVE_TASKS_LIMIT : 0)))
+ .setClientMode(getTestIgniteInstanceIndex(igniteInstanceName) ==
3);
}
/**
@@ -118,7 +119,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest
{
T2<UUID, Set<UUID>> val =
client.compute().execute(TestTask.class.getName(), null);
assertEquals(nodeId(0), val.get1());
- assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())),
val.get2());
+ assertEquals(new
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
}
}
@@ -145,7 +146,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest
{
T2<UUID, Set<UUID>> val = client.compute().execute(TEST_TASK_NAME,
null);
assertEquals(nodeId(0), val.get1());
- assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())),
val.get2());
+ assertEquals(new
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
}
}
@@ -174,7 +175,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest
{
assertTrue(fut.isDone());
assertEquals(nodeId(0), val.get1());
- assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())),
val.get2());
+ assertEquals(new
HashSet<>(F.nodeIds(grid(0).cluster().forServers().nodes())), val.get2());
}
}
@@ -250,10 +251,25 @@ public class ComputeTaskTest extends
GridCommonAbstractTest {
assertEquals(nodeId(0), val.get1());
assertEquals(nodeIds(1, 2), val.get2());
+
+ // Compute on client node defined explicitly.
+ grp = client.cluster().forNodeIds(nodeIds(3));
+
+ val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(nodeIds(3), val.get2());
+
+ // Compute on all nodes (clients + servers).
+ grp = client.cluster();
+
+ val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())),
val.get2());
}
}
-
/**
*
*/