This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 41f0eeb4d10 IGNITE-26214 SQL Calcite: Optimization for MultiDC - Fixes
#12512.
41f0eeb4d10 is described below
commit 41f0eeb4d10ba5c709d53e809289b2cb043900b9
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Nov 13 17:45:47 2025 +0300
IGNITE-26214 SQL Calcite: Optimization for MultiDC - Fixes #12512.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/metadata/ColocationGroup.java | 8 +
.../calcite/schema/CacheTableDescriptorImpl.java | 49 ++++--
.../integration/MultiDcQueryMappingTest.java | 192 +++++++++++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
4 files changed, 239 insertions(+), 12 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 0f78c8fe90b..47c15e619b1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -70,6 +70,14 @@ public class ColocationGroup implements CalciteMessage {
/** */
public static ColocationGroup forAssignments(List<List<UUID>> assignments)
{
+ return new ColocationGroup(null, null, assignments, false);
+ }
+
+ /**
+ * Creates colocation group with assignments equal to cache assignments
(i.e. cache assignments on remote nodes
+ * can be used for the same topology).
+ */
+ public static ColocationGroup forCacheAssignments(List<List<UUID>>
assignments) {
return new ColocationGroup(null, null, assignments, true);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index 405c8dc4dac..d60eb448ba6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -547,7 +548,28 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
assignments0.add(F.isEmpty(partNodes) ? emptyList() :
singletonList(F.first(partNodes).id()));
}
- return ColocationGroup.forAssignments(assignments0);
+ String dcId =
cacheContext().kernalContext().discovery().localNode().dataCenterId();
+ Collection<UUID> sameDcNodeIds = dcId == null ? null : new
HashSet<>(F.viewReadOnly(
+ cctx.kernalContext().discovery().aliveServerNodes(),
+ ClusterNode::id, n -> Objects.equals(n.dataCenterId(), dcId)));
+
+ if (dcId != null) {
+ List<List<UUID>> curDcAssignments = new
ArrayList<>(assignments0.size());
+
+ for (List<UUID> assignment : assignments0) {
+ List<UUID> curDcAssignment = U.arrayList(assignment,
sameDcNodeIds::contains);
+
+ // If any assignment become empty after filtration by DC,
return original assignments.
+ if (F.isEmpty(curDcAssignment) && !F.isEmpty(assignment))
+ return ColocationGroup.forCacheAssignments(assignments0);
+
+ curDcAssignments.add(curDcAssignment);
+ }
+
+ return ColocationGroup.forAssignments(curDcAssignments);
+ }
+
+ return ColocationGroup.forCacheAssignments(assignments0);
}
/** */
@@ -557,29 +579,32 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
GridDhtPartitionTopology top = cctx.topology();
List<ClusterNode> nodes =
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
- List<UUID> nodes0;
+ List<UUID> nodeIds;
top.readLock();
try {
- if (!top.rebalanceFinished(topVer)) {
- nodes0 = new ArrayList<>(nodes.size());
+ int parts = top.partitions();
- int parts = top.partitions();
+ List<ClusterNode> nodes0 = top.rebalanceFinished(topVer) ? nodes :
+ U.arrayList(nodes, node -> isOwner(node.id(), top, parts));
- for (ClusterNode node : nodes) {
- if (isOwner(node.id(), top, parts))
- nodes0.add(node.id());
- }
+ String dcId =
cacheContext().kernalContext().discovery().localNode().dataCenterId();
+
+ if (dcId != null) {
+ List<ClusterNode> curDcNodes = U.arrayList(nodes0, node ->
dcId.equals(node.dataCenterId()));
+
+ if (!F.isEmpty(curDcNodes))
+ nodes0 = curDcNodes;
}
- else
- nodes0 = Commons.transform(nodes, ClusterNode::id);
+
+ nodeIds = Commons.transform(nodes0, ClusterNode::id);
}
finally {
top.readUnlock();
}
- return ColocationGroup.forNodes(nodes0);
+ return ColocationGroup.forNodes(nodeIds);
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MultiDcQueryMappingTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MultiDcQueryMappingTest.java
new file mode 100644
index 00000000000..dd6169df11b
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MultiDcQueryMappingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.QueryEntity;
+import
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class MultiDcQueryMappingTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final String DC1 = "DC1";
+
+ /** */
+ private static final String DC2 = "DC2";
+
+ /** */
+ private static final int ROWS_CNT = 100;
+
+ /** */
+ private String dcId;
+
+ /** */
+ AtomicBoolean crossDcRequest = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ if
(!ignite().cluster().localNode().dataCenterId().equals(node.dataCenterId()))
+ crossDcRequest.set(true);
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+
cfg.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
dcId));
+
+ // Partitioned cache, in both data centers.
+ CacheConfiguration<Integer, Employer> ccfgPart2dc =
cacheConfiguration("part2dc")
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(1)
+ .setAffinity(new
RendezvousAffinityFunction().setAffinityBackupFilter(
+ new
ClusterNodeAttributeAffinityBackupFilter(IgniteSystemProperties.IGNITE_DATA_CENTER_ID)));
+
+ // Replicated cache, in both data centers.
+ CacheConfiguration<Integer, Employer> ccfgRepl2dc =
cacheConfiguration("repl2dc")
+ .setCacheMode(CacheMode.REPLICATED);
+
+ // Partitioned cache, in one data center.
+ CacheConfiguration<Integer, Employer> ccfgPart1dc =
cacheConfiguration("part1dc")
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(1)
+ .setNodeFilter(n -> n.dataCenterId().equals(DC1));
+
+ // Replicated cache, in one data center.
+ CacheConfiguration<Integer, Employer> ccfgRepl1dc =
cacheConfiguration("repl1dc")
+ .setCacheMode(CacheMode.REPLICATED)
+ .setNodeFilter(n -> n.dataCenterId().equals(DC2));
+
+ cfg.setCacheConfiguration(ccfgPart2dc, ccfgRepl2dc, ccfgPart1dc,
ccfgRepl1dc);
+
+ return cfg;
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Employer> cacheConfiguration(String
name) {
+ return new CacheConfiguration<Integer, Employer>()
+ .setName(name)
+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setSqlSchema(QueryUtils.DFLT_SCHEMA)
+ .setQueryEntities(Collections.singletonList(new
QueryEntity(Integer.class, Employer.class)
+ .setTableName(name)
+ .addQueryField("ID", Integer.class.getName(), "ID")
+ .setKeyFieldName("ID")
+ ));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ // No-op.
+ }
+
+ /** */
+ @Test
+ public void testQueryMapping() throws Exception {
+ dcId = DC1;
+
+ IgniteEx srv1dc1 = startGrid(0);
+ IgniteEx srv2dc1 = startGrid(1);
+ IgniteEx clientDc1 = startClientGrid(2);
+
+ dcId = DC2;
+
+ IgniteEx srv1dc2 = startGrid(3);
+ IgniteEx srv2dc2 = startGrid(4);
+ IgniteEx clientDc2 = startClientGrid(5);
+
+ fillTable(clientDc1, "part2dc");
+ fillTable(clientDc1, "repl2dc");
+ fillTable(clientDc1, "part1dc");
+ fillTable(clientDc1, "repl1dc");
+
+ checkQueries(clientDc1);
+ checkQueries(clientDc2);
+ checkQueries(srv1dc1);
+ checkQueries(srv1dc2);
+ checkQueries(srv2dc1);
+ checkQueries(srv2dc2);
+ }
+
+ /** */
+ private void fillTable(IgniteEx ignite, String name) {
+ for (int i = 0; i < ROWS_CNT; i++)
+ sql(ignite, "INSERT INTO " + name + "(ID, NAME, SALARY) VALUES (?,
?, ?)", i, "name" + i, i);
+ }
+
+ /** */
+ private void checkQueries(IgniteEx ignite) {
+ boolean dc1 =
DC1.equals(ignite.context().discovery().localNode().dataCenterId());
+ checkQuery(ignite, "SELECT * FROM part2dc", false);
+ checkQuery(ignite, "SELECT * FROM repl2dc", false);
+ checkQuery(ignite, "SELECT * FROM part1dc", !dc1);
+ checkQuery(ignite, "SELECT * FROM repl1dc", dc1);
+ checkQuery(ignite, "SELECT * FROM part2dc JOIN repl2dc USING (name)",
false);
+ checkQuery(ignite, "SELECT * FROM part1dc JOIN repl1dc USING (name)",
true);
+ }
+
+ /** */
+ private void checkQuery(IgniteEx ignite, String sql, boolean expCrossDc) {
+ crossDcRequest.set(false);
+
+ List<?> res = sql(ignite, sql);
+
+ assertEquals(ROWS_CNT, res.size());
+
+ assertEquals(expCrossDc, crossDcRequest.get());
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index ce775611114..2754c4bc417 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.LocalDate
import
org.apache.ignite.internal.processors.query.calcite.integration.LocalQueryIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuotasIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.MultiDcQueryMappingTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest;
@@ -173,6 +174,7 @@ import org.junit.runners.Suite;
KeyClassChangeIntegrationTest.class,
QueryEntityValueColumnAliasTest.class,
CacheStoreTest.class,
+ MultiDcQueryMappingTest.class,
})
public class IntegrationTestSuite {
}