This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-20503
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a5143ed9a6d1dad3d7dba7a1e0d32666cc864594
Author: amashenkov <[email protected]>
AuthorDate: Thu Jun 20 17:15:41 2024 +0300

    wip
---
 .../exec/mapping/ColocationMappingException.java   |   2 +-
 .../AbstractTarget.java                            | 104 ++++----
 .../exec/mapping/largecluster/AllOfTarget.java     |  72 ++++++
 .../mapping/largecluster/LargeClusterFactory.java  | 109 +++++++++
 .../exec/mapping/largecluster/OneOfTarget.java     |  80 +++++++
 .../mapping/largecluster/PartitionedTarget.java    |  99 ++++++++
 .../exec/mapping/largecluster/SomeOfTarget.java    |  72 ++++++
 .../exec/mapping/smallcluster/AbstractTarget.java  |   4 +-
 .../mapping/ExecutionTargetFactorySelfTest.java    | 263 +++++++++++++++++++++
 9 files changed, 756 insertions(+), 49 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java
index cbd93db0bd..ace2e0eafe 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java
@@ -26,6 +26,6 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 public class ColocationMappingException extends Exception {
     /** Constructor. */
     public ColocationMappingException(String message) {
-        super(message, null, true, false);
+        super(message, null, true, true);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
similarity index 69%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index 293d697774..1a51307e65 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
 import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
 
@@ -23,7 +23,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
+import org.apache.calcite.util.BitSets;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -35,26 +37,19 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
  * colocation method.
  */
 abstract class AbstractTarget implements ExecutionTarget {
-    final long nodes;
+    final BitSet nodes;
 
-    AbstractTarget(long nodes) {
+    AbstractTarget(BitSet nodes) {
         this.nodes = nodes;
     }
 
     List<String> nodes(List<String> nodeNames) {
-        if (isPow2(nodes)) {
-            int idx = Long.numberOfTrailingZeros(nodes);
-
-            return List.of(nodeNames.get(idx));
-        }
-
-        int count = Long.bitCount(nodes);
+        int count = nodes.cardinality();
         List<String> result = new ArrayList<>(count);
 
-        for (int bit = 1, idx = 0; bit <= nodes; bit <<= 1, idx++) {
-            if ((nodes & bit) != 0) {
-                result.add(nodeNames.get(idx));
-            }
+        int next = 0;
+        while ((next = nodes.nextSetBit(next + 1)) != -1) {
+            result.add(nodeNames.get(next));
         }
 
         return result;
@@ -70,11 +65,11 @@ abstract class AbstractTarget implements ExecutionTarget {
         Int2ObjectMap<NodeWithConsistencyToken> result = new 
Int2ObjectOpenHashMap<>(partitionedTarget.partitionsNodes.length);
 
         for (int partNo = 0; partNo < 
partitionedTarget.partitionsNodes.length; partNo++) {
-            long partitionNodes = partitionedTarget.partitionsNodes[partNo];
+            BitSet partitionNodes = partitionedTarget.partitionsNodes[partNo];
 
-            assert isPow2(partitionNodes);
+            assert partitionNodes.cardinality() == 1;
 
-            int idx = Long.numberOfTrailingZeros(partitionNodes);
+            int idx = partitionNodes.nextSetBit(0);
 
             result.put(partNo, new NodeWithConsistencyToken(
                     nodeNames.get(idx),
@@ -96,7 +91,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     abstract ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException;
 
     static ExecutionTarget colocate(AllOfTarget allOf, AllOfTarget otherAllOf) 
throws ColocationMappingException {
-        if (allOf.nodes != otherAllOf.nodes) {
+        if (!allOf.nodes.equals(otherAllOf.nodes) || 
otherAllOf.nodes.cardinality() == 0) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -104,14 +99,20 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(AllOfTarget allOf, OneOfTarget oneOf) 
throws ColocationMappingException {
-        if ((allOf.nodes & oneOf.nodes) == 0) {
+        int target = allOf.nodes.nextSetBit(0);
+
+        if (target == -1 || allOf.nodes.nextSetBit(target + 1) != -1) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
-        if (!isPow2(allOf.nodes)) {
+        if (!oneOf.nodes.get(target)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        // When colocated, AllOfTarget must contains a single node that 
matches one of OneOfTarget nodes.
+        assert allOf.nodes.cardinality() == 1;
+        assert allOf.nodes.intersects(oneOf.nodes);
+
         return allOf;
     }
 
@@ -120,9 +121,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(AllOfTarget allOf, SomeOfTarget someOf) 
throws ColocationMappingException {
-        long newNodes = allOf.nodes & someOf.nodes;
-
-        if (allOf.nodes != newNodes) {
+        if (!BitSets.contains(someOf.nodes, allOf.nodes) || 
allOf.nodes.cardinality() == 0) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -130,34 +129,42 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, OneOfTarget 
anotherOneOf) throws ColocationMappingException {
-        long newNodes = oneOf.nodes & anotherOneOf.nodes;
-
-        if (newNodes == 0) {
+        if (!oneOf.nodes.intersects(anotherOneOf.nodes)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        BitSet newNodes = BitSet.valueOf(oneOf.nodes.toLongArray());
+        newNodes.and(anotherOneOf.nodes);
+
         return new OneOfTarget(newNodes);
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, PartitionedTarget 
partitioned) throws ColocationMappingException {
-        if ((oneOf.nodes & partitioned.nodes) == 0) {
+        int target = partitioned.nodes.nextSetBit(0);
+
+        if (target == -1 || partitioned.nodes.nextSetBit(target + 1) != -1) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
-        if (!isPow2((partitioned.nodes))) {
+        if (!oneOf.nodes.get(target)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        // When colocated, PartitionedTarget must contains a single node that 
matches one of OneOfTarget nodes.
+        assert partitioned.nodes.cardinality() == 1;
+        assert partitioned.nodes.intersects(oneOf.nodes);
+
         return partitioned;
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, SomeOfTarget someOf) 
throws ColocationMappingException {
-        long newNodes = oneOf.nodes & someOf.nodes;
-
-        if (newNodes == 0) {
+        if (!oneOf.nodes.intersects(someOf.nodes)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        BitSet newNodes = BitSet.valueOf(oneOf.nodes.toLongArray());
+        newNodes.and(someOf.nodes);
+
         return new OneOfTarget(newNodes);
     }
 
@@ -167,11 +174,9 @@ abstract class AbstractTarget implements ExecutionTarget {
         }
 
         boolean finalised = true;
-        long[] newPartitionsNodes = new 
long[partitioned.partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new 
BitSet[partitioned.partitionsNodes.length];
         for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
-            long newNodes = partitioned.partitionsNodes[partNo] & 
otherPartitioned.partitionsNodes[partNo];
-
-            if (newNodes == 0) {
+            if 
(!partitioned.partitionsNodes[partNo].intersects(otherPartitioned.partitionsNodes[partNo]))
 {
                 throw new ColocationMappingException("Targets are not 
colocated");
             }
 
@@ -179,8 +184,11 @@ abstract class AbstractTarget implements ExecutionTarget {
                 throw new ColocationMappingException("Partitioned targets have 
different terms");
             }
 
+            BitSet newNodes = 
BitSet.valueOf(partitioned.partitionsNodes[partNo].toLongArray());
+            newNodes.and(otherPartitioned.nodes);
+
             newPartitionsNodes[partNo] = newNodes;
-            finalised = finalised && isPow2(newNodes);
+            finalised = finalised && newNodes.cardinality() == 1;
         }
 
         return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
@@ -188,32 +196,36 @@ abstract class AbstractTarget implements ExecutionTarget {
 
     static ExecutionTarget colocate(PartitionedTarget partitioned, 
SomeOfTarget someOf) throws ColocationMappingException {
         boolean finalised = true;
-        long[] newPartitionsNodes = new 
long[partitioned.partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new 
BitSet[partitioned.partitionsNodes.length];
         for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
-            long newNodes = partitioned.partitionsNodes[partNo] & someOf.nodes;
-
-            if (newNodes == 0) {
+            if (!partitioned.partitionsNodes[partNo].intersects(someOf.nodes)) 
{
                 throw new ColocationMappingException("Targets are not 
colocated");
             }
 
+            BitSet newNodes = 
BitSet.valueOf(partitioned.partitionsNodes[partNo].toLongArray());
+            newNodes.and(someOf.nodes);
+
             newPartitionsNodes[partNo] = newNodes;
-            finalised = finalised && isPow2(newNodes);
+            finalised = finalised && newNodes.cardinality() == 1;
         }
 
         return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
     }
 
     static ExecutionTarget colocate(SomeOfTarget someOf, SomeOfTarget 
otherSomeOf) throws ColocationMappingException {
-        long newNodes = someOf.nodes & otherSomeOf.nodes;
-
-        if (newNodes == 0) {
+        if (!someOf.nodes.intersects(otherSomeOf.nodes)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        BitSet newNodes = BitSet.valueOf(someOf.nodes.toLongArray());
+        newNodes.and(otherSomeOf.nodes);
+
         return new SomeOfTarget(newNodes);
     }
 
-    static long pickOne(long nodes) {
-        return Long.lowestOneBit(nodes);
+    static BitSet pickOne(BitSet nodes) {
+        int node = nodes.nextSetBit(0);
+
+        return node == -1 ? BitSets.of() : BitSets.of(node);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
new file mode 100644
index 0000000000..81636a9118
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
+
+import java.util.BitSet;
+import java.util.List;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+
+/**
+ * Represents a target that should be executed on all of the given nodes.
+ *
+ * <p>See javadoc of {@link ExecutionTargetFactory#allOf(List)} for details.
+ */
+class AllOfTarget extends AbstractTarget {
+    AllOfTarget(BitSet nodes) {
+        super(nodes);
+    }
+
+    @Override
+    boolean finalised() {
+        return true;
+    }
+
+    @Override
+    public ExecutionTarget finalise() {
+        return this;
+    }
+
+    @Override
+    public ExecutionTarget colocateWith(ExecutionTarget other) throws 
ColocationMappingException {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        return ((AbstractTarget) other).colocate(this);
+    }
+
+    @Override
+    ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    ExecutionTarget colocate(OneOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    ExecutionTarget colocate(PartitionedTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
new file mode 100644
index 0000000000..6bac22abc8
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.util.BitSet;
+import java.util.List;
+import org.apache.calcite.util.BitSets;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+
+/**
+ * A factory that able to create targets for cluster with up to 64 nodes.
+ */
+public class LargeClusterFactory implements ExecutionTargetFactory {
+    private final List<String> nodes;
+    private final Object2IntMap<String> nodeNameToId;
+
+    /** Constructor. */
+    public LargeClusterFactory(List<String> nodes) {
+        this.nodes = nodes;
+
+        nodeNameToId = new Object2IntOpenHashMap<>(nodes.size());
+
+        int idx = 0;
+        for (String name : nodes) {
+            nodeNameToId.putIfAbsent(name, idx++);
+        }
+    }
+
+    @Override
+    public ExecutionTarget allOf(List<String> nodes) {
+        return new AllOfTarget(nodeListToMap(nodes));
+    }
+
+    @Override
+    public ExecutionTarget oneOf(List<String> nodes) {
+        return new OneOfTarget(nodeListToMap(nodes));
+    }
+
+    @Override
+    public ExecutionTarget someOf(List<String> nodes) {
+        return new SomeOfTarget(nodeListToMap(nodes));
+    }
+
+    @Override
+    public ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes) {
+        BitSet[] partitionNodes = new BitSet[nodes.size()];
+        long[] enlistmentConsistencyTokens = new long[nodes.size()];
+
+        int idx = 0;
+        for (NodeWithConsistencyToken e : nodes) {
+            int node = nodeNameToId.getOrDefault(e.name(), -1);
+            partitionNodes[idx] = node == -1 ? BitSets.of() : BitSets.of(node);
+            enlistmentConsistencyTokens[idx++] = 
e.enlistmentConsistencyToken();
+        }
+
+        return new PartitionedTarget(true, partitionNodes, 
enlistmentConsistencyTokens);
+    }
+
+    @Override
+    public List<String> resolveNodes(ExecutionTarget target) {
+        target = target.finalise();
+
+        assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
+
+        return ((AbstractTarget) target).nodes(nodes);
+    }
+
+    @Override
+    public Int2ObjectMap<NodeWithConsistencyToken> 
resolveAssignments(ExecutionTarget target) {
+        target = target.finalise();
+
+        assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
+
+        return ((AbstractTarget) target).assignments(nodes);
+    }
+
+    private BitSet nodeListToMap(List<String> nodes) {
+        BitSet nodesMap = new BitSet(nodes.size());
+
+        for (String name : nodes) {
+            int id = nodeNameToId.getOrDefault(name, -1);
+            if (id != -1) {
+                nodesMap.set(id);
+            }
+        }
+
+        return nodesMap;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
new file mode 100644
index 0000000000..993c2f89d9
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
+
+import java.util.BitSet;
+import java.util.List;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+
+/**
+ * Represents a target that should be executed on exactly one node from given 
list.
+ *
+ * <p>See javadoc of {@link ExecutionTargetFactory#oneOf(List)} for details.
+ */
+class OneOfTarget extends AbstractTarget {
+    private final boolean finalised;
+
+    OneOfTarget(BitSet nodes) {
+        super(nodes);
+
+        finalised = nodes.cardinality() == 1;
+    }
+
+    @Override
+    boolean finalised() {
+        return finalised;
+    }
+
+    @Override
+    public ExecutionTarget finalise() {
+        if (finalised()) {
+            return this;
+        }
+
+        return new OneOfTarget(pickOne(nodes));
+    }
+
+    @Override
+    public ExecutionTarget colocateWith(ExecutionTarget other) throws 
ColocationMappingException {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        return ((AbstractTarget) other).colocate(this);
+    }
+
+    @Override
+    public ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    public ExecutionTarget colocate(OneOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    public ExecutionTarget colocate(PartitionedTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    public ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
new file mode 100644
index 0000000000..44e76aaaed
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
+
+import java.util.BitSet;
+import java.util.List;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+
+/**
+ * Represents a target that consists of a number of primary partitions.
+ *
+ * <p>See javadoc of {@link ExecutionTargetFactory#partitioned(List)} for 
details.
+ */
+class PartitionedTarget extends AbstractTarget {
+    private final boolean finalised;
+    final BitSet[] partitionsNodes;
+    final long[] enlistmentConsistencyTokens;
+
+    PartitionedTarget(boolean finalised, BitSet[] partitionsNodes, long[] 
enlistmentConsistencyTokens) {
+        super(computeNodes(partitionsNodes));
+
+        this.finalised = finalised;
+        this.partitionsNodes = partitionsNodes;
+        this.enlistmentConsistencyTokens = enlistmentConsistencyTokens;
+    }
+
+    @Override
+    boolean finalised() {
+        return finalised;
+    }
+
+    @Override
+    public ExecutionTarget finalise() {
+        if (finalised) {
+            return this;
+        }
+
+        BitSet[] newPartitionsNodes = new BitSet[partitionsNodes.length];
+
+        for (int partNo = 0; partNo < partitionsNodes.length; partNo++) {
+            newPartitionsNodes[partNo] = pickOne(partitionsNodes[partNo]);
+        }
+
+        return new PartitionedTarget(true, newPartitionsNodes, 
enlistmentConsistencyTokens);
+    }
+
+    @Override
+    public ExecutionTarget colocateWith(ExecutionTarget other) throws 
ColocationMappingException {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        return ((AbstractTarget) other).colocate(this);
+    }
+
+    @Override
+    ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    ExecutionTarget colocate(OneOfTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    ExecutionTarget colocate(PartitionedTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    @Override
+    ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+
+    private static BitSet computeNodes(BitSet[] partitionsNodes) {
+        BitSet nodes = new BitSet();
+        for (BitSet nodesOfPartition : partitionsNodes) {
+            nodes.or(nodesOfPartition);
+        }
+
+        return nodes;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
new file mode 100644
index 0000000000..4c4bc7fbbd
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
+
+import java.util.BitSet;
+import java.util.List;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+
+/**
+ * Represents a target that may be executed on a non empty subset of the given 
nodes.
+ *
+ * <p>See javadoc of {@link ExecutionTargetFactory#someOf(List)} for details.
+ */
+class SomeOfTarget extends AbstractTarget {
+    SomeOfTarget(BitSet nodes) {
+        super(nodes);
+    }
+
+    @Override
+    boolean finalised() {
+        return true;
+    }
+
+    @Override
+    public ExecutionTarget finalise() {
+        return this;
+    }
+
+    @Override
+    public ExecutionTarget colocateWith(ExecutionTarget other) throws 
ColocationMappingException {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        return ((AbstractTarget) other).colocate(this);
+    }
+
+    @Override
+    ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    ExecutionTarget colocate(OneOfTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    ExecutionTarget colocate(PartitionedTarget other) throws 
ColocationMappingException {
+        return colocate(other, this);
+    }
+
+    @Override
+    ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException {
+        return colocate(this, other);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
index 293d697774..98138fda89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
@@ -96,7 +96,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     abstract ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException;
 
     static ExecutionTarget colocate(AllOfTarget allOf, AllOfTarget otherAllOf) 
throws ColocationMappingException {
-        if (allOf.nodes != otherAllOf.nodes) {
+        if (otherAllOf.nodes == 0 || allOf.nodes != otherAllOf.nodes) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -122,7 +122,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     static ExecutionTarget colocate(AllOfTarget allOf, SomeOfTarget someOf) 
throws ColocationMappingException {
         long newNodes = allOf.nodes & someOf.nodes;
 
-        if (allOf.nodes != newNodes) {
+        if (newNodes == 0 || allOf.nodes != newNodes) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
new file mode 100644
index 0000000000..550481a9be
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.emptyIterableOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.iterableWithSize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test class to verify {@link ExecutionTargetFactory} implementations.
+ *
+ * @see SmallClusterFactory
+ * @see LargeClusterFactory
+ */
+public class ExecutionTargetFactorySelfTest {
+    private static final List<String> ALL_NODES = List.of("node1", "node2", 
"node3", "node4", "node5");
+    private static final List<String> NODE_SET = List.of("node2", "node4", 
"node5");
+    private static final List<String> NODE_SET2 = List.of("node2", "node3", 
"node5");
+    private static final List<String> NODE_SUBSET = List.of("node2", "node5");
+    private static final List<String> SINGLE_NODE_SET = List.of("node4");
+    private static final List<String> INVALID_NODE_SET = List.of("node0");
+
+    private static List<ExecutionTargetFactory> clusterFactory() {
+        return List.of(
+                new SmallClusterFactory(ALL_NODES),
+                new LargeClusterFactory(ALL_NODES)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void targetsResolution(ExecutionTargetFactory f) {
+        List<String> targetNodes = NODE_SET;
+
+        assertThat(f.resolveNodes(f.allOf(targetNodes)), equalTo(targetNodes));
+        assertThat(f.resolveNodes(f.allOf(targetNodes)), equalTo(targetNodes));
+        assertThat(f.resolveNodes(f.someOf(targetNodes)), 
hasItems(in(targetNodes)));
+        assertThat(f.resolveNodes(f.oneOf(targetNodes)), 
allOf(iterableWithSize(1), hasItems(in(targetNodes))));
+        assertThat(f.resolveNodes(f.partitioned(withTokens(targetNodes))), 
equalTo(targetNodes));
+
+        assertThat(f.resolveNodes(f.allOf(INVALID_NODE_SET)), 
emptyIterableOf(String.class));
+        assertThat(f.resolveNodes(f.allOf(INVALID_NODE_SET)), 
emptyIterableOf(String.class));
+        assertThat(f.resolveNodes(f.someOf(INVALID_NODE_SET)), 
emptyIterableOf(String.class));
+        assertThat(f.resolveNodes(f.oneOf(INVALID_NODE_SET)), 
emptyIterableOf(String.class));
+        
assertThat(f.resolveNodes(f.partitioned(withTokens(INVALID_NODE_SET))), 
emptyIterableOf(String.class));
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void allOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.allOf(NODE_SET), f.allOf(NODE_SET), 
equalTo(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.allOf(NODE_SUBSET));
+        assertNotColocated(f.allOf(NODE_SUBSET), f.allOf(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.allOf(INVALID_NODE_SET));
+        assertNotColocated(f.allOf(INVALID_NODE_SET), 
f.allOf(INVALID_NODE_SET));
+
+        // Colocation with SomeOf
+        assertColocated(f, f.allOf(NODE_SET), f.someOf(NODE_SET), 
equalTo(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.someOf(NODE_SUBSET));
+        assertColocated(f, f.allOf(NODE_SUBSET), f.someOf(NODE_SET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.allOf(NODE_SET), f.someOf(INVALID_NODE_SET));
+        assertNotColocated(f.allOf(INVALID_NODE_SET), 
f.someOf(INVALID_NODE_SET));
+
+        // Colocation with OneOf
+        assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SUBSET));
+        assertNotColocated(f.allOf(NODE_SUBSET), f.oneOf(NODE_SET));
+        assertColocated(f, f.allOf(SINGLE_NODE_SET), f.oneOf(NODE_SET), 
equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.oneOf(INVALID_NODE_SET));
+        assertNotColocated(f.allOf(INVALID_NODE_SET), 
f.oneOf(INVALID_NODE_SET));
+
+        // Colocation with Partitioned
+        assertNotColocated(f.allOf(NODE_SET), 
f.partitioned(withTokens(NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.allOf(SINGLE_NODE_SET), 
f.partitioned(withTokens(NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.allOf(SINGLE_NODE_SET), 
f.partitioned(withTokens(SINGLE_NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.allOf(SINGLE_NODE_SET), 
f.partitioned(withTokens(INVALID_NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void someOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SET), 
equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SUBSET), 
equalTo(NODE_SUBSET));
+        assertColocated(f, f.someOf(NODE_SUBSET), f.someOf(NODE_SET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // 
Disjoint sets.
+        assertNotColocated(f.someOf(NODE_SET), f.someOf(INVALID_NODE_SET));
+        assertNotColocated(f.someOf(INVALID_NODE_SET), 
f.someOf(INVALID_NODE_SET));
+
+        // Colocation with AllOf
+        assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SET), 
equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SUBSET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(NODE_SUBSET), f.allOf(NODE_SET));
+        assertNotColocated(f.someOf(NODE_SET), f.allOf(INVALID_NODE_SET));
+        assertNotColocated(f.someOf(INVALID_NODE_SET), 
f.allOf(INVALID_NODE_SET));
+
+        // Colocation with OneOf
+        assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SET), 
containsSingleFromSubset(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SUBSET), f.oneOf(NODE_SET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SUBSET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertNotColocated(f.someOf(SINGLE_NODE_SET), f.oneOf(NODE_SET2)); // 
Disjoint sets.
+        assertNotColocated(f.someOf(NODE_SET), f.oneOf(INVALID_NODE_SET));
+        assertNotColocated(f.someOf(INVALID_NODE_SET), 
f.oneOf(INVALID_NODE_SET));
+
+        // Colocation with Partitioned
+        assertColocated(f, f.someOf(NODE_SET), 
f.partitioned(withTokens(NODE_SET)), equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), 
f.partitioned(withTokens(NODE_SUBSET)), equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(NODE_SUBSET), 
f.partitioned(withTokens(NODE_SET)));
+        assertNotColocated(f.someOf(NODE_SET), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+        assertNotColocated(f.someOf(INVALID_NODE_SET), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void oneOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SET), 
containsSingleFromSubset(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(shuffle(NODE_SET)), 
containsSingleFromSubset(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SUBSET), f.oneOf(NODE_SET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SUBSET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertNotColocated(f.oneOf(NODE_SET2), f.oneOf(SINGLE_NODE_SET)); // 
Disjoint sets.
+        assertNotColocated(f.oneOf(NODE_SET), f.oneOf(INVALID_NODE_SET));
+        assertNotColocated(f.oneOf(INVALID_NODE_SET), 
f.oneOf(INVALID_NODE_SET));
+
+        // Colocation with AllOf
+        assertNotColocated(f.oneOf(NODE_SET), f.allOf(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SET), f.allOf(SINGLE_NODE_SET), 
equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SUBSET), f.allOf(NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SET2), f.allOf(SINGLE_NODE_SET)); // 
Disjoint sets.
+        assertNotColocated(f.oneOf(NODE_SET), f.allOf(INVALID_NODE_SET));
+        assertNotColocated(f.oneOf(INVALID_NODE_SET), 
f.allOf(INVALID_NODE_SET));
+
+        // Colocation with someOf
+        assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SET), 
containsSingleFromSubset(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SUBSET), f.someOf(NODE_SET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SUBSET), 
containsSingleFromSubset(NODE_SUBSET));
+        assertNotColocated(f.oneOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // 
Disjoint sets.
+        assertNotColocated(f.oneOf(NODE_SET), f.someOf(INVALID_NODE_SET));
+        assertNotColocated(f.oneOf(INVALID_NODE_SET), 
f.someOf(INVALID_NODE_SET));
+
+        // Colocation with Partitioned
+        assertNotColocated(f.oneOf(NODE_SET), 
f.partitioned(withTokens(NODE_SET)));
+        assertColocated(f, f.oneOf(NODE_SET), 
f.partitioned(withTokens(SINGLE_NODE_SET)), equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SET2), 
f.partitioned(withTokens(SINGLE_NODE_SET))); // Disjoint sets.
+        assertNotColocated(f.oneOf(NODE_SET), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+        assertNotColocated(f.oneOf(INVALID_NODE_SET), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void partitionedTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.partitioned(withTokens(NODE_SET)), 
f.partitioned(withTokens(NODE_SET)), equalTo(NODE_SET));
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.partitioned(shuffle(withTokens(NODE_SET))));
+        assertNotColocated(f.partitioned(withTokens(NODE_SUBSET)), 
f.partitioned(withTokens(NODE_SET)),
+                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.partitioned(withTokens(NODE_SUBSET)),
+                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+        assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+        assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), 
f.partitioned(withTokens(INVALID_NODE_SET)));
+
+        assertNotColocated(f.partitioned(singleWithToken("node1", 1))
+                , f.partitioned(singleWithToken("node1", 2)),
+                "Partitioned targets have different terms");
+
+        // Colocation with AllOf
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.allOf(NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.allOf(SINGLE_NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.allOf(INVALID_NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), 
f.allOf(INVALID_NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+
+        // Colocation with someOf
+        assertColocated(f, f.partitioned(withTokens(NODE_SET)), 
f.someOf(NODE_SET), equalTo(NODE_SET));
+        assertColocated(f, f.partitioned(withTokens(NODE_SUBSET)), 
f.someOf(NODE_SET), equalTo(NODE_SUBSET));
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.someOf(NODE_SUBSET));
+        assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), 
f.someOf(NODE_SET2)); // Disjoint sets.
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.someOf(INVALID_NODE_SET));
+        assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), 
f.someOf(INVALID_NODE_SET));
+
+        // Colocation with oneOf
+        assertNotColocated(f.partitioned(withTokens(NODE_SET)), 
f.oneOf(NODE_SET));
+        assertColocated(f, f.partitioned(withTokens(SINGLE_NODE_SET)), 
f.oneOf(NODE_SET), equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), 
f.oneOf(NODE_SET2)); // Disjoint
+        assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), 
f.oneOf(INVALID_NODE_SET));
+        assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), 
f.oneOf(INVALID_NODE_SET));
+    }
+
+    private static List<NodeWithConsistencyToken> withTokens(List<String> 
nodes) {
+        return nodes.stream().map(n -> new NodeWithConsistencyToken(n, 
ALL_NODES.indexOf(n))).collect(Collectors.toList());
+    }
+
+    private static List<NodeWithConsistencyToken> singleWithToken(String name, 
int token) {
+        return List.of(new NodeWithConsistencyToken(name, token));
+    }
+
+    private static <T> ArrayList<T> shuffle(List<T> nodeSetWithTokens) {
+        ArrayList<T> shuffled = new ArrayList<>(nodeSetWithTokens);
+        Collections.reverse(shuffled);
+        return shuffled;
+    }
+
+    private static Matcher<Iterable<String>> 
containsSingleFromSubset(List<String> items) {
+        return allOf(iterableWithSize(1), hasItems(in(items)));
+    }
+
+    private static void assertColocated(
+            ExecutionTargetFactory factory,
+            ExecutionTarget target,
+            ExecutionTarget other,
+            Matcher<Iterable<String>> matcher
+    ) throws ColocationMappingException {
+        assertThat(factory.resolveNodes(target.colocateWith(other)), matcher);
+    }
+
+    private static void assertNotColocated(ExecutionTarget target, 
ExecutionTarget other) {
+        assertNotColocated(target, other, "Targets are not colocated");
+    }
+
+    @SuppressWarnings("ThrowableNotThrown")
+    private static void assertNotColocated(ExecutionTarget target, 
ExecutionTarget other, String errorMessageFragment) {
+        assertThrows(ColocationMappingException.class, () -> 
target.colocateWith(other), errorMessageFragment);
+    }
+}


Reply via email to