This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-20503 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a5143ed9a6d1dad3d7dba7a1e0d32666cc864594 Author: amashenkov <[email protected]> AuthorDate: Thu Jun 20 17:15:41 2024 +0300 wip --- .../exec/mapping/ColocationMappingException.java | 2 +- .../AbstractTarget.java | 104 ++++---- .../exec/mapping/largecluster/AllOfTarget.java | 72 ++++++ .../mapping/largecluster/LargeClusterFactory.java | 109 +++++++++ .../exec/mapping/largecluster/OneOfTarget.java | 80 +++++++ .../mapping/largecluster/PartitionedTarget.java | 99 ++++++++ .../exec/mapping/largecluster/SomeOfTarget.java | 72 ++++++ .../exec/mapping/smallcluster/AbstractTarget.java | 4 +- .../mapping/ExecutionTargetFactorySelfTest.java | 263 +++++++++++++++++++++ 9 files changed, 756 insertions(+), 49 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java index cbd93db0bd..ace2e0eafe 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationMappingException.java @@ -26,6 +26,6 @@ package org.apache.ignite.internal.sql.engine.exec.mapping; public class ColocationMappingException extends Exception { /** Constructor. */ public ColocationMappingException(String message) { - super(message, null, true, false); + super(message, null, true, true); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java similarity index 69% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java index 293d697774..1a51307e65 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster; +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; import static org.apache.ignite.internal.util.IgniteUtils.isPow2; @@ -23,7 +23,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; +import java.util.BitSet; import java.util.List; +import org.apache.calcite.util.BitSets; import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; @@ -35,26 +37,19 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; * colocation method. */ abstract class AbstractTarget implements ExecutionTarget { - final long nodes; + final BitSet nodes; - AbstractTarget(long nodes) { + AbstractTarget(BitSet nodes) { this.nodes = nodes; } List<String> nodes(List<String> nodeNames) { - if (isPow2(nodes)) { - int idx = Long.numberOfTrailingZeros(nodes); - - return List.of(nodeNames.get(idx)); - } - - int count = Long.bitCount(nodes); + int count = nodes.cardinality(); List<String> result = new ArrayList<>(count); - for (int bit = 1, idx = 0; bit <= nodes; bit <<= 1, idx++) { - if ((nodes & bit) != 0) { - result.add(nodeNames.get(idx)); - } + int next = 0; + while ((next = nodes.nextSetBit(next + 1)) != -1) { + result.add(nodeNames.get(next)); } return result; @@ -70,11 +65,11 @@ abstract class AbstractTarget implements ExecutionTarget { Int2ObjectMap<NodeWithConsistencyToken> result = new Int2ObjectOpenHashMap<>(partitionedTarget.partitionsNodes.length); for (int partNo = 0; partNo < partitionedTarget.partitionsNodes.length; partNo++) { - long partitionNodes = partitionedTarget.partitionsNodes[partNo]; + BitSet partitionNodes = partitionedTarget.partitionsNodes[partNo]; - assert isPow2(partitionNodes); + assert partitionNodes.cardinality() == 1; - int idx = Long.numberOfTrailingZeros(partitionNodes); + int idx = partitionNodes.nextSetBit(0); result.put(partNo, new NodeWithConsistencyToken( nodeNames.get(idx), @@ -96,7 +91,7 @@ abstract class AbstractTarget implements ExecutionTarget { abstract ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException; static ExecutionTarget colocate(AllOfTarget allOf, AllOfTarget otherAllOf) throws ColocationMappingException { - if (allOf.nodes != otherAllOf.nodes) { + if (!allOf.nodes.equals(otherAllOf.nodes) || otherAllOf.nodes.cardinality() == 0) { throw new ColocationMappingException("Targets are not colocated"); } @@ -104,14 +99,20 @@ abstract class AbstractTarget implements ExecutionTarget { } static ExecutionTarget colocate(AllOfTarget allOf, OneOfTarget oneOf) throws ColocationMappingException { - if ((allOf.nodes & oneOf.nodes) == 0) { + int target = allOf.nodes.nextSetBit(0); + + if (target == -1 || allOf.nodes.nextSetBit(target + 1) != -1) { throw new ColocationMappingException("Targets are not colocated"); } - if (!isPow2(allOf.nodes)) { + if (!oneOf.nodes.get(target)) { throw new ColocationMappingException("Targets are not colocated"); } + // When colocated, AllOfTarget must contains a single node that matches one of OneOfTarget nodes. + assert allOf.nodes.cardinality() == 1; + assert allOf.nodes.intersects(oneOf.nodes); + return allOf; } @@ -120,9 +121,7 @@ abstract class AbstractTarget implements ExecutionTarget { } static ExecutionTarget colocate(AllOfTarget allOf, SomeOfTarget someOf) throws ColocationMappingException { - long newNodes = allOf.nodes & someOf.nodes; - - if (allOf.nodes != newNodes) { + if (!BitSets.contains(someOf.nodes, allOf.nodes) || allOf.nodes.cardinality() == 0) { throw new ColocationMappingException("Targets are not colocated"); } @@ -130,34 +129,42 @@ abstract class AbstractTarget implements ExecutionTarget { } static ExecutionTarget colocate(OneOfTarget oneOf, OneOfTarget anotherOneOf) throws ColocationMappingException { - long newNodes = oneOf.nodes & anotherOneOf.nodes; - - if (newNodes == 0) { + if (!oneOf.nodes.intersects(anotherOneOf.nodes)) { throw new ColocationMappingException("Targets are not colocated"); } + BitSet newNodes = BitSet.valueOf(oneOf.nodes.toLongArray()); + newNodes.and(anotherOneOf.nodes); + return new OneOfTarget(newNodes); } static ExecutionTarget colocate(OneOfTarget oneOf, PartitionedTarget partitioned) throws ColocationMappingException { - if ((oneOf.nodes & partitioned.nodes) == 0) { + int target = partitioned.nodes.nextSetBit(0); + + if (target == -1 || partitioned.nodes.nextSetBit(target + 1) != -1) { throw new ColocationMappingException("Targets are not colocated"); } - if (!isPow2((partitioned.nodes))) { + if (!oneOf.nodes.get(target)) { throw new ColocationMappingException("Targets are not colocated"); } + // When colocated, PartitionedTarget must contains a single node that matches one of OneOfTarget nodes. + assert partitioned.nodes.cardinality() == 1; + assert partitioned.nodes.intersects(oneOf.nodes); + return partitioned; } static ExecutionTarget colocate(OneOfTarget oneOf, SomeOfTarget someOf) throws ColocationMappingException { - long newNodes = oneOf.nodes & someOf.nodes; - - if (newNodes == 0) { + if (!oneOf.nodes.intersects(someOf.nodes)) { throw new ColocationMappingException("Targets are not colocated"); } + BitSet newNodes = BitSet.valueOf(oneOf.nodes.toLongArray()); + newNodes.and(someOf.nodes); + return new OneOfTarget(newNodes); } @@ -167,11 +174,9 @@ abstract class AbstractTarget implements ExecutionTarget { } boolean finalised = true; - long[] newPartitionsNodes = new long[partitioned.partitionsNodes.length]; + BitSet[] newPartitionsNodes = new BitSet[partitioned.partitionsNodes.length]; for (int partNo = 0; partNo < partitioned.partitionsNodes.length; partNo++) { - long newNodes = partitioned.partitionsNodes[partNo] & otherPartitioned.partitionsNodes[partNo]; - - if (newNodes == 0) { + if (!partitioned.partitionsNodes[partNo].intersects(otherPartitioned.partitionsNodes[partNo])) { throw new ColocationMappingException("Targets are not colocated"); } @@ -179,8 +184,11 @@ abstract class AbstractTarget implements ExecutionTarget { throw new ColocationMappingException("Partitioned targets have different terms"); } + BitSet newNodes = BitSet.valueOf(partitioned.partitionsNodes[partNo].toLongArray()); + newNodes.and(otherPartitioned.nodes); + newPartitionsNodes[partNo] = newNodes; - finalised = finalised && isPow2(newNodes); + finalised = finalised && newNodes.cardinality() == 1; } return new PartitionedTarget(finalised, newPartitionsNodes, partitioned.enlistmentConsistencyTokens); @@ -188,32 +196,36 @@ abstract class AbstractTarget implements ExecutionTarget { static ExecutionTarget colocate(PartitionedTarget partitioned, SomeOfTarget someOf) throws ColocationMappingException { boolean finalised = true; - long[] newPartitionsNodes = new long[partitioned.partitionsNodes.length]; + BitSet[] newPartitionsNodes = new BitSet[partitioned.partitionsNodes.length]; for (int partNo = 0; partNo < partitioned.partitionsNodes.length; partNo++) { - long newNodes = partitioned.partitionsNodes[partNo] & someOf.nodes; - - if (newNodes == 0) { + if (!partitioned.partitionsNodes[partNo].intersects(someOf.nodes)) { throw new ColocationMappingException("Targets are not colocated"); } + BitSet newNodes = BitSet.valueOf(partitioned.partitionsNodes[partNo].toLongArray()); + newNodes.and(someOf.nodes); + newPartitionsNodes[partNo] = newNodes; - finalised = finalised && isPow2(newNodes); + finalised = finalised && newNodes.cardinality() == 1; } return new PartitionedTarget(finalised, newPartitionsNodes, partitioned.enlistmentConsistencyTokens); } static ExecutionTarget colocate(SomeOfTarget someOf, SomeOfTarget otherSomeOf) throws ColocationMappingException { - long newNodes = someOf.nodes & otherSomeOf.nodes; - - if (newNodes == 0) { + if (!someOf.nodes.intersects(otherSomeOf.nodes)) { throw new ColocationMappingException("Targets are not colocated"); } + BitSet newNodes = BitSet.valueOf(someOf.nodes.toLongArray()); + newNodes.and(otherSomeOf.nodes); + return new SomeOfTarget(newNodes); } - static long pickOne(long nodes) { - return Long.lowestOneBit(nodes); + static BitSet pickOne(BitSet nodes) { + int node = nodes.nextSetBit(0); + + return node == -1 ? BitSets.of() : BitSets.of(node); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java new file mode 100644 index 0000000000..81636a9118 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; + +/** + * Represents a target that should be executed on all of the given nodes. + * + * <p>See javadoc of {@link ExecutionTargetFactory#allOf(List)} for details. + */ +class AllOfTarget extends AbstractTarget { + AllOfTarget(BitSet nodes) { + super(nodes); + } + + @Override + boolean finalised() { + return true; + } + + @Override + public ExecutionTarget finalise() { + return this; + } + + @Override + public ExecutionTarget colocateWith(ExecutionTarget other) throws ColocationMappingException { + assert other instanceof AbstractTarget : other == null ? "<null>" : other.getClass().getCanonicalName(); + + return ((AbstractTarget) other).colocate(this); + } + + @Override + ExecutionTarget colocate(AllOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + ExecutionTarget colocate(OneOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + ExecutionTarget colocate(PartitionedTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java new file mode 100644 index 0000000000..6bac22abc8 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.util.BitSet; +import java.util.List; +import org.apache.calcite.util.BitSets; +import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; + +/** + * A factory that able to create targets for cluster with up to 64 nodes. + */ +public class LargeClusterFactory implements ExecutionTargetFactory { + private final List<String> nodes; + private final Object2IntMap<String> nodeNameToId; + + /** Constructor. */ + public LargeClusterFactory(List<String> nodes) { + this.nodes = nodes; + + nodeNameToId = new Object2IntOpenHashMap<>(nodes.size()); + + int idx = 0; + for (String name : nodes) { + nodeNameToId.putIfAbsent(name, idx++); + } + } + + @Override + public ExecutionTarget allOf(List<String> nodes) { + return new AllOfTarget(nodeListToMap(nodes)); + } + + @Override + public ExecutionTarget oneOf(List<String> nodes) { + return new OneOfTarget(nodeListToMap(nodes)); + } + + @Override + public ExecutionTarget someOf(List<String> nodes) { + return new SomeOfTarget(nodeListToMap(nodes)); + } + + @Override + public ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes) { + BitSet[] partitionNodes = new BitSet[nodes.size()]; + long[] enlistmentConsistencyTokens = new long[nodes.size()]; + + int idx = 0; + for (NodeWithConsistencyToken e : nodes) { + int node = nodeNameToId.getOrDefault(e.name(), -1); + partitionNodes[idx] = node == -1 ? BitSets.of() : BitSets.of(node); + enlistmentConsistencyTokens[idx++] = e.enlistmentConsistencyToken(); + } + + return new PartitionedTarget(true, partitionNodes, enlistmentConsistencyTokens); + } + + @Override + public List<String> resolveNodes(ExecutionTarget target) { + target = target.finalise(); + + assert target instanceof AbstractTarget : target == null ? "<null>" : target.getClass().getCanonicalName(); + + return ((AbstractTarget) target).nodes(nodes); + } + + @Override + public Int2ObjectMap<NodeWithConsistencyToken> resolveAssignments(ExecutionTarget target) { + target = target.finalise(); + + assert target instanceof AbstractTarget : target == null ? "<null>" : target.getClass().getCanonicalName(); + + return ((AbstractTarget) target).assignments(nodes); + } + + private BitSet nodeListToMap(List<String> nodes) { + BitSet nodesMap = new BitSet(nodes.size()); + + for (String name : nodes) { + int id = nodeNameToId.getOrDefault(name, -1); + if (id != -1) { + nodesMap.set(id); + } + } + + return nodesMap; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java new file mode 100644 index 0000000000..993c2f89d9 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; + +/** + * Represents a target that should be executed on exactly one node from given list. + * + * <p>See javadoc of {@link ExecutionTargetFactory#oneOf(List)} for details. + */ +class OneOfTarget extends AbstractTarget { + private final boolean finalised; + + OneOfTarget(BitSet nodes) { + super(nodes); + + finalised = nodes.cardinality() == 1; + } + + @Override + boolean finalised() { + return finalised; + } + + @Override + public ExecutionTarget finalise() { + if (finalised()) { + return this; + } + + return new OneOfTarget(pickOne(nodes)); + } + + @Override + public ExecutionTarget colocateWith(ExecutionTarget other) throws ColocationMappingException { + assert other instanceof AbstractTarget : other == null ? "<null>" : other.getClass().getCanonicalName(); + + return ((AbstractTarget) other).colocate(this); + } + + @Override + public ExecutionTarget colocate(AllOfTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + public ExecutionTarget colocate(OneOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + public ExecutionTarget colocate(PartitionedTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + public ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java new file mode 100644 index 0000000000..44e76aaaed --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; + +/** + * Represents a target that consists of a number of primary partitions. + * + * <p>See javadoc of {@link ExecutionTargetFactory#partitioned(List)} for details. + */ +class PartitionedTarget extends AbstractTarget { + private final boolean finalised; + final BitSet[] partitionsNodes; + final long[] enlistmentConsistencyTokens; + + PartitionedTarget(boolean finalised, BitSet[] partitionsNodes, long[] enlistmentConsistencyTokens) { + super(computeNodes(partitionsNodes)); + + this.finalised = finalised; + this.partitionsNodes = partitionsNodes; + this.enlistmentConsistencyTokens = enlistmentConsistencyTokens; + } + + @Override + boolean finalised() { + return finalised; + } + + @Override + public ExecutionTarget finalise() { + if (finalised) { + return this; + } + + BitSet[] newPartitionsNodes = new BitSet[partitionsNodes.length]; + + for (int partNo = 0; partNo < partitionsNodes.length; partNo++) { + newPartitionsNodes[partNo] = pickOne(partitionsNodes[partNo]); + } + + return new PartitionedTarget(true, newPartitionsNodes, enlistmentConsistencyTokens); + } + + @Override + public ExecutionTarget colocateWith(ExecutionTarget other) throws ColocationMappingException { + assert other instanceof AbstractTarget : other == null ? "<null>" : other.getClass().getCanonicalName(); + + return ((AbstractTarget) other).colocate(this); + } + + @Override + ExecutionTarget colocate(AllOfTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + ExecutionTarget colocate(OneOfTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + ExecutionTarget colocate(PartitionedTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + @Override + ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } + + private static BitSet computeNodes(BitSet[] partitionsNodes) { + BitSet nodes = new BitSet(); + for (BitSet nodesOfPartition : partitionsNodes) { + nodes.or(nodesOfPartition); + } + + return nodes; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java new file mode 100644 index 0000000000..4c4bc7fbbd --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; + +/** + * Represents a target that may be executed on a non empty subset of the given nodes. + * + * <p>See javadoc of {@link ExecutionTargetFactory#someOf(List)} for details. + */ +class SomeOfTarget extends AbstractTarget { + SomeOfTarget(BitSet nodes) { + super(nodes); + } + + @Override + boolean finalised() { + return true; + } + + @Override + public ExecutionTarget finalise() { + return this; + } + + @Override + public ExecutionTarget colocateWith(ExecutionTarget other) throws ColocationMappingException { + assert other instanceof AbstractTarget : other == null ? "<null>" : other.getClass().getCanonicalName(); + + return ((AbstractTarget) other).colocate(this); + } + + @Override + ExecutionTarget colocate(AllOfTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + ExecutionTarget colocate(OneOfTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + ExecutionTarget colocate(PartitionedTarget other) throws ColocationMappingException { + return colocate(other, this); + } + + @Override + ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException { + return colocate(this, other); + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java index 293d697774..98138fda89 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java @@ -96,7 +96,7 @@ abstract class AbstractTarget implements ExecutionTarget { abstract ExecutionTarget colocate(SomeOfTarget other) throws ColocationMappingException; static ExecutionTarget colocate(AllOfTarget allOf, AllOfTarget otherAllOf) throws ColocationMappingException { - if (allOf.nodes != otherAllOf.nodes) { + if (otherAllOf.nodes == 0 || allOf.nodes != otherAllOf.nodes) { throw new ColocationMappingException("Targets are not colocated"); } @@ -122,7 +122,7 @@ abstract class AbstractTarget implements ExecutionTarget { static ExecutionTarget colocate(AllOfTarget allOf, SomeOfTarget someOf) throws ColocationMappingException { long newNodes = allOf.nodes & someOf.nodes; - if (allOf.nodes != newNodes) { + if (newNodes == 0 || allOf.nodes != newNodes) { throw new ColocationMappingException("Targets are not colocated"); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java new file mode 100644 index 0000000000..550481a9be --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.mapping; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.emptyIterableOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.iterableWithSize; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken; +import org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory; +import org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory; +import org.hamcrest.Matcher; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Test class to verify {@link ExecutionTargetFactory} implementations. + * + * @see SmallClusterFactory + * @see LargeClusterFactory + */ +public class ExecutionTargetFactorySelfTest { + private static final List<String> ALL_NODES = List.of("node1", "node2", "node3", "node4", "node5"); + private static final List<String> NODE_SET = List.of("node2", "node4", "node5"); + private static final List<String> NODE_SET2 = List.of("node2", "node3", "node5"); + private static final List<String> NODE_SUBSET = List.of("node2", "node5"); + private static final List<String> SINGLE_NODE_SET = List.of("node4"); + private static final List<String> INVALID_NODE_SET = List.of("node0"); + + private static List<ExecutionTargetFactory> clusterFactory() { + return List.of( + new SmallClusterFactory(ALL_NODES), + new LargeClusterFactory(ALL_NODES) + ); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void targetsResolution(ExecutionTargetFactory f) { + List<String> targetNodes = NODE_SET; + + assertThat(f.resolveNodes(f.allOf(targetNodes)), equalTo(targetNodes)); + assertThat(f.resolveNodes(f.allOf(targetNodes)), equalTo(targetNodes)); + assertThat(f.resolveNodes(f.someOf(targetNodes)), hasItems(in(targetNodes))); + assertThat(f.resolveNodes(f.oneOf(targetNodes)), allOf(iterableWithSize(1), hasItems(in(targetNodes)))); + assertThat(f.resolveNodes(f.partitioned(withTokens(targetNodes))), equalTo(targetNodes)); + + assertThat(f.resolveNodes(f.allOf(INVALID_NODE_SET)), emptyIterableOf(String.class)); + assertThat(f.resolveNodes(f.allOf(INVALID_NODE_SET)), emptyIterableOf(String.class)); + assertThat(f.resolveNodes(f.someOf(INVALID_NODE_SET)), emptyIterableOf(String.class)); + assertThat(f.resolveNodes(f.oneOf(INVALID_NODE_SET)), emptyIterableOf(String.class)); + assertThat(f.resolveNodes(f.partitioned(withTokens(INVALID_NODE_SET))), emptyIterableOf(String.class)); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void allOfTargets(ExecutionTargetFactory f) throws Exception { + // Self colocation + assertColocated(f, f.allOf(NODE_SET), f.allOf(NODE_SET), equalTo(NODE_SET)); + assertNotColocated(f.allOf(NODE_SET), f.allOf(NODE_SUBSET)); + assertNotColocated(f.allOf(NODE_SUBSET), f.allOf(NODE_SET)); + assertNotColocated(f.allOf(NODE_SET), f.allOf(INVALID_NODE_SET)); + assertNotColocated(f.allOf(INVALID_NODE_SET), f.allOf(INVALID_NODE_SET)); + + // Colocation with SomeOf + assertColocated(f, f.allOf(NODE_SET), f.someOf(NODE_SET), equalTo(NODE_SET)); + assertNotColocated(f.allOf(NODE_SET), f.someOf(NODE_SUBSET)); + assertColocated(f, f.allOf(NODE_SUBSET), f.someOf(NODE_SET), equalTo(NODE_SUBSET)); + assertNotColocated(f.allOf(NODE_SET), f.someOf(INVALID_NODE_SET)); + assertNotColocated(f.allOf(INVALID_NODE_SET), f.someOf(INVALID_NODE_SET)); + + // Colocation with OneOf + assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SET)); + assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SUBSET)); + assertNotColocated(f.allOf(NODE_SUBSET), f.oneOf(NODE_SET)); + assertColocated(f, f.allOf(SINGLE_NODE_SET), f.oneOf(NODE_SET), equalTo(SINGLE_NODE_SET)); + assertNotColocated(f.allOf(NODE_SET), f.oneOf(INVALID_NODE_SET)); + assertNotColocated(f.allOf(INVALID_NODE_SET), f.oneOf(INVALID_NODE_SET)); + + // Colocation with Partitioned + assertNotColocated(f.allOf(NODE_SET), f.partitioned(withTokens(NODE_SET)), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.allOf(SINGLE_NODE_SET), f.partitioned(withTokens(NODE_SET)), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.allOf(SINGLE_NODE_SET), f.partitioned(withTokens(SINGLE_NODE_SET)), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.allOf(SINGLE_NODE_SET), f.partitioned(withTokens(INVALID_NODE_SET)), + "AllOf target and Partitioned can't be colocated"); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void someOfTargets(ExecutionTargetFactory f) throws Exception { + // Self colocation + assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SET), equalTo(NODE_SET)); + assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SUBSET), equalTo(NODE_SUBSET)); + assertColocated(f, f.someOf(NODE_SUBSET), f.someOf(NODE_SET), equalTo(NODE_SUBSET)); + assertNotColocated(f.someOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // Disjoint sets. + assertNotColocated(f.someOf(NODE_SET), f.someOf(INVALID_NODE_SET)); + assertNotColocated(f.someOf(INVALID_NODE_SET), f.someOf(INVALID_NODE_SET)); + + // Colocation with AllOf + assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SET), equalTo(NODE_SET)); + assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SUBSET), equalTo(NODE_SUBSET)); + assertNotColocated(f.someOf(NODE_SUBSET), f.allOf(NODE_SET)); + assertNotColocated(f.someOf(NODE_SET), f.allOf(INVALID_NODE_SET)); + assertNotColocated(f.someOf(INVALID_NODE_SET), f.allOf(INVALID_NODE_SET)); + + // Colocation with OneOf + assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SET), containsSingleFromSubset(NODE_SET)); + assertColocated(f, f.someOf(NODE_SUBSET), f.oneOf(NODE_SET), containsSingleFromSubset(NODE_SUBSET)); + assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SUBSET), containsSingleFromSubset(NODE_SUBSET)); + assertNotColocated(f.someOf(SINGLE_NODE_SET), f.oneOf(NODE_SET2)); // Disjoint sets. + assertNotColocated(f.someOf(NODE_SET), f.oneOf(INVALID_NODE_SET)); + assertNotColocated(f.someOf(INVALID_NODE_SET), f.oneOf(INVALID_NODE_SET)); + + // Colocation with Partitioned + assertColocated(f, f.someOf(NODE_SET), f.partitioned(withTokens(NODE_SET)), equalTo(NODE_SET)); + assertColocated(f, f.someOf(NODE_SET), f.partitioned(withTokens(NODE_SUBSET)), equalTo(NODE_SUBSET)); + assertNotColocated(f.someOf(NODE_SUBSET), f.partitioned(withTokens(NODE_SET))); + assertNotColocated(f.someOf(NODE_SET), f.partitioned(withTokens(INVALID_NODE_SET))); + assertNotColocated(f.someOf(INVALID_NODE_SET), f.partitioned(withTokens(INVALID_NODE_SET))); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void oneOfTargets(ExecutionTargetFactory f) throws Exception { + // Self colocation + assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SET), containsSingleFromSubset(NODE_SET)); + assertColocated(f, f.oneOf(NODE_SET), f.oneOf(shuffle(NODE_SET)), containsSingleFromSubset(NODE_SET)); + assertColocated(f, f.oneOf(NODE_SUBSET), f.oneOf(NODE_SET), containsSingleFromSubset(NODE_SUBSET)); + assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SUBSET), containsSingleFromSubset(NODE_SUBSET)); + assertNotColocated(f.oneOf(NODE_SET2), f.oneOf(SINGLE_NODE_SET)); // Disjoint sets. + assertNotColocated(f.oneOf(NODE_SET), f.oneOf(INVALID_NODE_SET)); + assertNotColocated(f.oneOf(INVALID_NODE_SET), f.oneOf(INVALID_NODE_SET)); + + // Colocation with AllOf + assertNotColocated(f.oneOf(NODE_SET), f.allOf(NODE_SET)); + assertColocated(f, f.oneOf(NODE_SET), f.allOf(SINGLE_NODE_SET), equalTo(SINGLE_NODE_SET)); + assertNotColocated(f.oneOf(NODE_SUBSET), f.allOf(NODE_SET)); + assertNotColocated(f.oneOf(NODE_SET2), f.allOf(SINGLE_NODE_SET)); // Disjoint sets. + assertNotColocated(f.oneOf(NODE_SET), f.allOf(INVALID_NODE_SET)); + assertNotColocated(f.oneOf(INVALID_NODE_SET), f.allOf(INVALID_NODE_SET)); + + // Colocation with someOf + assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SET), containsSingleFromSubset(NODE_SET)); + assertColocated(f, f.oneOf(NODE_SUBSET), f.someOf(NODE_SET), containsSingleFromSubset(NODE_SUBSET)); + assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SUBSET), containsSingleFromSubset(NODE_SUBSET)); + assertNotColocated(f.oneOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // Disjoint sets. + assertNotColocated(f.oneOf(NODE_SET), f.someOf(INVALID_NODE_SET)); + assertNotColocated(f.oneOf(INVALID_NODE_SET), f.someOf(INVALID_NODE_SET)); + + // Colocation with Partitioned + assertNotColocated(f.oneOf(NODE_SET), f.partitioned(withTokens(NODE_SET))); + assertColocated(f, f.oneOf(NODE_SET), f.partitioned(withTokens(SINGLE_NODE_SET)), equalTo(SINGLE_NODE_SET)); + assertNotColocated(f.oneOf(NODE_SET2), f.partitioned(withTokens(SINGLE_NODE_SET))); // Disjoint sets. + assertNotColocated(f.oneOf(NODE_SET), f.partitioned(withTokens(INVALID_NODE_SET))); + assertNotColocated(f.oneOf(INVALID_NODE_SET), f.partitioned(withTokens(INVALID_NODE_SET))); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void partitionedTargets(ExecutionTargetFactory f) throws Exception { + // Self colocation + assertColocated(f, f.partitioned(withTokens(NODE_SET)), f.partitioned(withTokens(NODE_SET)), equalTo(NODE_SET)); + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.partitioned(shuffle(withTokens(NODE_SET)))); + assertNotColocated(f.partitioned(withTokens(NODE_SUBSET)), f.partitioned(withTokens(NODE_SET)), + "Partitioned targets with mot matching numbers of partitioned are not colocated"); + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.partitioned(withTokens(NODE_SUBSET)), + "Partitioned targets with mot matching numbers of partitioned are not colocated"); + assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), f.partitioned(withTokens(INVALID_NODE_SET))); + assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), f.partitioned(withTokens(INVALID_NODE_SET))); + + assertNotColocated(f.partitioned(singleWithToken("node1", 1)) + , f.partitioned(singleWithToken("node1", 2)), + "Partitioned targets have different terms"); + + // Colocation with AllOf + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.allOf(NODE_SET), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.allOf(SINGLE_NODE_SET), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.allOf(INVALID_NODE_SET), + "AllOf target and Partitioned can't be colocated"); + assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), f.allOf(INVALID_NODE_SET), + "AllOf target and Partitioned can't be colocated"); + + // Colocation with someOf + assertColocated(f, f.partitioned(withTokens(NODE_SET)), f.someOf(NODE_SET), equalTo(NODE_SET)); + assertColocated(f, f.partitioned(withTokens(NODE_SUBSET)), f.someOf(NODE_SET), equalTo(NODE_SUBSET)); + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.someOf(NODE_SUBSET)); + assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), f.someOf(NODE_SET2)); // Disjoint sets. + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.someOf(INVALID_NODE_SET)); + assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), f.someOf(INVALID_NODE_SET)); + + // Colocation with oneOf + assertNotColocated(f.partitioned(withTokens(NODE_SET)), f.oneOf(NODE_SET)); + assertColocated(f, f.partitioned(withTokens(SINGLE_NODE_SET)), f.oneOf(NODE_SET), equalTo(SINGLE_NODE_SET)); + assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), f.oneOf(NODE_SET2)); // Disjoint + assertNotColocated(f.partitioned(withTokens(SINGLE_NODE_SET)), f.oneOf(INVALID_NODE_SET)); + assertNotColocated(f.partitioned(withTokens(INVALID_NODE_SET)), f.oneOf(INVALID_NODE_SET)); + } + + private static List<NodeWithConsistencyToken> withTokens(List<String> nodes) { + return nodes.stream().map(n -> new NodeWithConsistencyToken(n, ALL_NODES.indexOf(n))).collect(Collectors.toList()); + } + + private static List<NodeWithConsistencyToken> singleWithToken(String name, int token) { + return List.of(new NodeWithConsistencyToken(name, token)); + } + + private static <T> ArrayList<T> shuffle(List<T> nodeSetWithTokens) { + ArrayList<T> shuffled = new ArrayList<>(nodeSetWithTokens); + Collections.reverse(shuffled); + return shuffled; + } + + private static Matcher<Iterable<String>> containsSingleFromSubset(List<String> items) { + return allOf(iterableWithSize(1), hasItems(in(items))); + } + + private static void assertColocated( + ExecutionTargetFactory factory, + ExecutionTarget target, + ExecutionTarget other, + Matcher<Iterable<String>> matcher + ) throws ColocationMappingException { + assertThat(factory.resolveNodes(target.colocateWith(other)), matcher); + } + + private static void assertNotColocated(ExecutionTarget target, ExecutionTarget other) { + assertNotColocated(target, other, "Targets are not colocated"); + } + + @SuppressWarnings("ThrowableNotThrown") + private static void assertNotColocated(ExecutionTarget target, ExecutionTarget other, String errorMessageFragment) { + assertThrows(ColocationMappingException.class, () -> target.colocateWith(other), errorMessageFragment); + } +}
