This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 92844b66f1a IGNITE-26395 Correct processing of initial empty data
nodes (#6604)
92844b66f1a is described below
commit 92844b66f1a5c0d1b7dea9016ac619ed6297e91c
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Sep 29 17:44:14 2025 +0300
IGNITE-26395 Correct processing of initial empty data nodes (#6604)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 6 +
.../distributionzones/ItEmptyDataNodesTest.java | 127 +++++++++++++++++++++
.../distributionzones/DistributionZoneManager.java | 33 +++++-
.../exception/EmptyDataNodesException.java | 39 +++++++
.../distributionzones/rebalance/RebalanceUtil.java | 8 +-
.../rebalance/ZoneRebalanceUtil.java | 8 +-
.../partition/replicator/fixtures/Node.java | 4 +-
.../PartitionReplicaLifecycleManager.java | 5 +-
.../placementdriver/EmptyAssignmentsException.java | 40 +++++++
.../MultiActorPlacementDriverTest.java | 4 +-
.../PlacementDriverManagerTest.java | 4 +-
.../placementdriver/AssignmentsTracker.java | 7 +-
.../placementdriver/PlacementDriverManager.java | 16 ++-
.../ignite/internal/placementdriver/Utils.java | 50 ++++++++
.../placementdriver/leases/LeaseTracker.java | 77 +++++++++++--
.../internal/placementdriver/ActiveActorTest.java | 4 +-
.../placementdriver/LeaseNegotiationTest.java | 11 +-
.../internal/placementdriver/LeaseTrackerTest.java | 8 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 6 +-
.../placementdriver/PlacementDriverTest.java | 20 +++-
modules/platforms/cpp/ignite/common/error_codes.h | 2 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 2 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 6 +
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 28 +++--
25 files changed, 477 insertions(+), 42 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 35fdd683a56..a963fc0e9fe 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -504,6 +504,9 @@ public class ErrorGroups {
/** Distribution zone was not found. */
public static final int ZONE_NOT_FOUND_ERR =
DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode((short) 1);
+
+ /** Empty data nodes. */
+ public static final int EMPTY_DATA_NODES_ERR =
DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode((short) 2);
}
/** Network error group. */
@@ -685,6 +688,9 @@ public class ErrorGroups {
/** Primary replica await error. */
public static final int PRIMARY_REPLICA_AWAIT_ERR =
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 2);
+
+ /** Error that occurs if there are no assignments for a group. */
+ public static final int EMPTY_ASSIGNMENTS_ERR =
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 3);
}
/** Critical workers error group. */
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
new file mode 100644
index 00000000000..5d05a250f88
--- /dev/null
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToIntKeepingOrder;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.distributionzones.exception.EmptyDataNodesException;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.server.WatchListenerInhibitor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+class ItEmptyDataNodesTest extends ClusterPerTestIntegrationTest {
+ private static final String ZONE_NAME = "zone0";
+ private static final String TABLE_NAME = "table0";
+
+ @Test
+ public void testInitialEmptyDataNodes() throws InterruptedException {
+ createZoneAndTableWithEmptyDataNodes();
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ int zoneId =
node.catalogManager().activeCatalog(node.clock().now().longValue()).zone(ZONE_NAME.toUpperCase()).id();
+
+ assertTrue(currentDataNodes(node, zoneId).isEmpty());
+
+ setAdditionalNodeFilter(null);
+
+ stopNode(2);
+ int aliveNodesCount = initialNodes() - 1;
+
+ waitForCondition(() -> currentDataNodes(node, zoneId).size() ==
aliveNodesCount, 5_000);
+
+ sql("INSERT INTO " + TABLE_NAME + " VALUES (1, 1)");
+ sql("SELECT * FROM " + TABLE_NAME + " WHERE id = 1");
+ }
+
+ @Test
+ public void testEmptyDataNodesException() {
+ createZoneAndTableWithEmptyDataNodes();
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ int zoneId =
node.catalogManager().activeCatalog(node.clock().now().longValue()).zone(ZONE_NAME.toUpperCase()).id();
+
+ assertTrue(currentDataNodes(node, zoneId).isEmpty());
+
+ assertThrowsWithCause(() -> sql("INSERT INTO " + TABLE_NAME + " VALUES
(1, 1)"), EmptyDataNodesException.class);
+ }
+
+ @Test
+ public void testInterruptZoneCreation() throws InterruptedException {
+ runningNodes().forEach(n ->
WatchListenerInhibitor.metastorageEventsInhibitor(n).startInhibit());
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ int activeCatalogVersion =
node.catalogManager().activeCatalog(node.clock().now().longValue()).version();
+
+ node.sql().executeAsync(null, format("CREATE ZONE {} (PARTITIONS 1,
AUTO SCALE DOWN 0) STORAGE PROFILES ['default']", ZONE_NAME));
+
+ ByteArray catalogVersionKey = new
ByteArray("catalog.version".getBytes(StandardCharsets.UTF_8));
+ waitForCondition(() -> {
+ int ver =
bytesToIntKeepingOrder(node.metaStorageManager().getLocally(catalogVersionKey).value());
+ return ver > activeCatalogVersion;
+ }, 5000);
+
+ for (int i = 0; i < runningNodes().count(); i++) {
+ stopNode(i);
+ startNode(i);
+ // No need to stop inhibiting after the node stop.
+ }
+
+ sql(format("CREATE TABLE {} (id INT PRIMARY KEY, val INT) ZONE {}",
TABLE_NAME, ZONE_NAME));
+ sql("INSERT INTO " + TABLE_NAME + " VALUES (1, 1)");
+ sql("SELECT * FROM " + TABLE_NAME + " WHERE id = 1");
+ }
+
+ private void createZoneAndTableWithEmptyDataNodes() {
+ setAdditionalNodeFilter(n -> false);
+
+ sql(format("CREATE ZONE {} (PARTITIONS 1, AUTO SCALE DOWN 0) STORAGE
PROFILES ['default']", ZONE_NAME));
+
+ sql(format("CREATE TABLE {} (id INT PRIMARY KEY, val INT) ZONE {}",
TABLE_NAME, ZONE_NAME));
+ }
+
+ private Set<String> currentDataNodes(IgniteImpl node, int zoneId) {
+ CompletableFuture<Set<String>> nodeFut =
node.distributionZoneManager().currentDataNodes(zoneId);
+ assertThat(nodeFut, willCompleteSuccessfully());
+ return nodeFut.join();
+ }
+
+ private void sql(String sql) {
+ cluster.aliveNode().sql().execute(null, sql);
+ }
+
+ private void setAdditionalNodeFilter(@Nullable
Predicate<NodeWithAttributes> filter) {
+ cluster.runningNodes()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .forEach(node ->
node.distributionZoneManager().setAdditionalNodeFilter(filter));
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 0c434629f2b..baa9fb61976 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -22,6 +22,7 @@ import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.CatalogManager.INITIAL_TIMESTAMP;
import static
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
@@ -67,6 +68,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
@@ -112,6 +114,7 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -170,11 +173,17 @@ public class DistributionZoneManager extends
private final MetricManager metricManager;
+ private final ClockService clockService;
+
/** Mapping from a zone identifier to the corresponding metric source. */
private final Map<Integer, ZoneMetricSource> zoneMetricSources = new
ConcurrentHashMap<>();
private final String localNodeName;
+ @TestOnly
+ @Nullable
+ private Predicate<NodeWithAttributes> additionalNodeFilter = null;
+
/**
* Constructor.
*/
@@ -232,6 +241,7 @@ public class DistributionZoneManager extends
this.failureProcessor = failureProcessor;
this.catalogManager = catalogManager;
this.localNodeName = nodeName;
+ this.clockService = clockService;
this.topologyWatchListener = createMetastorageTopologyListener();
@@ -324,6 +334,18 @@ public class DistributionZoneManager extends
return nullCompletedFuture();
}
+ /**
+ * Returns data nodes for the given time.
+ *
+ * @param zoneId Zone id.
+ * @return Data nodes for the current time.
+ */
+ public CompletableFuture<Set<String>> currentDataNodes(int zoneId) {
+ HybridTimestamp current = clockService.current();
+ int catalogVersion =
catalogManager.activeCatalogVersion(current.longValue());
+ return dataNodes(current, catalogVersion, zoneId);
+ }
+
/**
* Gets data nodes of the zone using causality token and catalog version.
{@code timestamp} must be agreed
* with the {@code catalogVersion}, meaning that for the provided {@code
timestamp} actual {@code catalogVersion} must be provided.
@@ -418,8 +440,12 @@ public class DistributionZoneManager extends
private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone,
long causalityToken) {
HybridTimestamp timestamp =
metaStorageManager.timestampByRevisionLocally(causalityToken);
+ Set<NodeWithAttributes> filteredDataNodes =
filterDataNodes(logicalTopology(causalityToken), zone).stream()
+ .filter(n -> additionalNodeFilter == null ||
additionalNodeFilter.test(n))
+ .collect(toSet());
+
return dataNodesManager
- .onZoneCreate(zone.id(), timestamp,
filterDataNodes(logicalTopology(causalityToken), zone))
+ .onZoneCreate(zone.id(), timestamp, filteredDataNodes)
.thenRun(() -> {
try {
registerMetricSource(zone);
@@ -717,6 +743,11 @@ public class DistributionZoneManager extends
return dataNodesManager;
}
+ @TestOnly
+ public void setAdditionalNodeFilter(Predicate<NodeWithAttributes> filter) {
+ additionalNodeFilter = filter;
+ }
+
/**
* Returns local mapping of {@code nodeId} -> node's attributes, where
{@code nodeId} is a node id, that changes between restarts.
* This map is updated every time we receive a topology event in a {@code
topologyWatchListener}.
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/EmptyDataNodesException.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/EmptyDataNodesException.java
new file mode 100644
index 00000000000..38073084e4f
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/EmptyDataNodesException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.exception;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.lang.ErrorGroups.DistributionZones.EMPTY_DATA_NODES_ERR;
+
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Exception thrown when there are no data nodes for the given zone.
+ */
+public class EmptyDataNodesException extends IgniteException {
+ private static final long serialVersionUID = 5691362165660196984L;
+
+ /**
+ * Constructor.
+ *
+ * @param zoneId Zone ID.
+ */
+ public EmptyDataNodesException(int zoneId) {
+ super(EMPTY_DATA_NODES_ERR, format("Empty data nodes for zone
[zoneId={}].", zoneId));
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 84faef21361..04c109ae71c 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -390,6 +390,12 @@ public class RebalanceUtil {
return tableStableAssignments(metaStorageManager,
tableDescriptor.id(), partitionIds)
.thenCompose(stableAssignments -> {
+ // In case of empty assignments due to initially empty
data nodes, assignments will be recalculated
+ // after the transition to non-empty data nodes.
+ // In case of empty assignments due to interrupted table
creation, assignments will be written
+ // during the node recovery and then replicas will be
started.
+ // In case when data nodes become empty, assignments are
not recalculated
+ // (see
DistributionZoneRebalanceEngine.createDistributionZonesDataNodesListener).
if (stableAssignments.isEmpty()) {
return nullCompletedFuture();
}
@@ -425,8 +431,6 @@ public class RebalanceUtil {
for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partId);
- // TODO https://issues.apache.org/jira/browse/IGNITE-26395 We
should distinguish empty stable assignments on
- // TODO node recovery in case of interrupted table creation, and
moving from empty assignments to non-empty.
futures[partId] = updatePendingAssignmentsKeys(
tableDescriptor,
replicaGrpId,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index ebf05a02aa9..9fac9106c88 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -398,8 +398,12 @@ public class ZoneRebalanceUtil {
int finalPartId = partId;
partitionFutures[partId] =
zoneAssignmentsFut.thenCompose(zoneAssignments -> inBusyLockAsync(busyLock, ()
-> {
- // TODO https://issues.apache.org/jira/browse/IGNITE-26395 We
should distinguish empty stable assignments on
- // TODO node recovery in case of interrupted table creation,
and moving from empty assignments to non-empty.
+ // In case of empty assignments due to initially empty data
nodes, assignments will be recalculated
+ // after the transition to non-empty data nodes.
+ // In case of empty assignments due to interrupted zone
creation, assignments will be written
+ // during the node recovery and then replicas will be started.
+ // In case when data nodes become empty, assignments are not
recalculated
+ // (see
DistributionZoneRebalanceEngineV2.createDistributionZonesDataNodesListener).
return zoneAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
zoneDescriptor,
replicaGrpId,
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 297ef1261a0..33a19374ebc 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -557,7 +557,9 @@ public class Node {
nodeProperties,
replicationConfiguration,
Runnable::run,
- metricManager
+ metricManager,
+ zoneId -> completedFuture(Set.of()),
+ id -> null
);
var transactionInflights = new
TransactionInflights(placementDriverManager.placementDriver(), clockService);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 13f5fe549a5..1e6adc07034 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1238,7 +1238,6 @@ public class PartitionReplicaLifecycleManager extends
Assignments stableAssignments = stableAssignments(zonePartitionId,
revision);
AssignmentsQueue pendingAssignmentsQueue =
AssignmentsQueue.fromBytes(pendingAssignmentsEntry.value());
- Assignments pendingAssignments = pendingAssignmentsQueue == null ?
Assignments.EMPTY : pendingAssignmentsQueue.poll();
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
@@ -1255,11 +1254,13 @@ public class PartitionReplicaLifecycleManager extends
zonePartitionId.partitionId(),
zonePartitionId.zoneId(),
localNode().address(),
- pendingAssignments,
+ pendingAssignmentsQueue,
revision
);
}
+ Assignments pendingAssignments = pendingAssignmentsQueue == null ?
Assignments.EMPTY : pendingAssignmentsQueue.poll();
+
return handleChangePendingAssignmentEvent(
zonePartitionId,
stableAssignments,
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
new file mode 100644
index 00000000000..00cd17e43a0
--- /dev/null
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.EMPTY_ASSIGNMENTS_ERR;
+
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Exception thrown when there are no assignments available.
+ */
+public class EmptyAssignmentsException extends IgniteException {
+ private static final long serialVersionUID = 1698246028174494488L;
+
+ /**
+ * Constructor.
+ *
+ * @param groupId Replication group id.
+ */
+ public EmptyAssignmentsException(ReplicationGroupId groupId) {
+ super(EMPTY_ASSIGNMENTS_ERR, format("Empty assignments for group
[groupId={}].", groupId));
+ }
+}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index aac35464f24..a7aa437454e 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -328,7 +328,9 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
new SystemPropertiesNodeProperties(),
replicationConfiguration,
Runnable::run,
- mock(MetricManager.class)
+ mock(MetricManager.class),
+ zoneId -> completedFuture(Set.of()),
+ id -> null
);
res.add(new Node(
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index acd8dc4d926..0f01b1831f9 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -267,7 +267,9 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
new SystemPropertiesNodeProperties(),
replicationConfiguration,
Runnable::run,
- mock(MetricManager.class)
+ mock(MetricManager.class),
+ zoneId -> completedFuture(Set.of()),
+ id -> null
);
ComponentContext componentContext = new ComponentContext();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 4b5673b9dae..8be2854c545 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -92,8 +92,13 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
*
* @param msManager Meta storage manager.
* @param failureProcessor Failure processor.
+ * @param nodeProperties Node properties.
*/
- public AssignmentsTracker(MetaStorageManager msManager, FailureProcessor
failureProcessor, NodeProperties nodeProperties) {
+ public AssignmentsTracker(
+ MetaStorageManager msManager,
+ FailureProcessor failureProcessor,
+ NodeProperties nodeProperties
+ ) {
this.msManager = msManager;
this.failureProcessor = failureProcessor;
this.nodeProperties = nodeProperties;
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 01d47aeb1d9..e398817c642 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.components.NodeProperties;
@@ -125,6 +126,8 @@ public class PlacementDriverManager implements
IgniteComponent {
* @param failureProcessor Failure processor.
* @param throttledLogExecutor Executor to clean up the throttled logger
cache.
* @param metricManager Metric manager.
+ * @param currentDataNodesProvider Provider of the current data nodes in
the cluster.
+ * @param zoneIdByTableIdResolver Resolver of zone id by table id (result
may be {@code null}).
*/
public PlacementDriverManager(
String nodeName,
@@ -140,7 +143,9 @@ public class PlacementDriverManager implements
IgniteComponent {
NodeProperties nodeProperties,
ReplicationConfiguration replicationConfiguration,
Executor throttledLogExecutor,
- MetricManager metricManager
+ MetricManager metricManager,
+ Function<Integer, CompletableFuture<Set<String>>>
currentDataNodesProvider,
+ Function<Integer, Integer> zoneIdByTableIdResolver
) {
this.replicationGroupId = replicationGroupId;
this.clusterService = clusterService;
@@ -152,7 +157,14 @@ public class PlacementDriverManager implements
IgniteComponent {
this.raftClientFuture = new CompletableFuture<>();
- this.leaseTracker = new LeaseTracker(metastore,
clusterService.topologyService(), clockService);
+ this.leaseTracker = new LeaseTracker(
+ metastore,
+ clusterService.topologyService(),
+ clockService,
+ currentDataNodesProvider,
+ zoneIdByTableIdResolver,
+ nodeProperties
+ );
this.assignmentsTracker = new AssignmentsTracker(metastore,
failureProcessor, nodeProperties);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Utils.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Utils.java
new file mode 100644
index 00000000000..6ccc31f278b
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Utils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.util.function.Function;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utils for placement driver.
+ */
+public class Utils {
+ /**
+ * Extracts zone ID from the given group id.
+ *
+ * @param groupId Replication group ID.
+ * @param colocationEnabled Whether colocation is enabled.
+ * @param zoneIdByTableIdResolver Function to resolve zone ID by table ID
if colocation is disabled.
+ * @return Zone ID or {@code null}.
+ */
+ @Nullable
+ public static Integer extractZoneIdFromGroupId(
+ ReplicationGroupId groupId,
+ boolean colocationEnabled,
+ Function<Integer, Integer> zoneIdByTableIdResolver
+ ) {
+ if (colocationEnabled && groupId instanceof ZonePartitionId) {
+ return ((ZonePartitionId) groupId).zoneId();
+ } else {
+ return zoneIdByTableIdResolver.apply(((TablePartitionId)
groupId).tableId());
+ }
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index bb90592d647..1170f2fc222 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -21,8 +21,10 @@ import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
+import static
org.apache.ignite.internal.placementdriver.Utils.extractZoneIdFromGroupId;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED;
import static
org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
@@ -36,6 +38,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.newHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -43,7 +46,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.function.Supplier;
+import org.apache.ignite.internal.components.NodeProperties;
+import
org.apache.ignite.internal.distributionzones.exception.EmptyDataNodesException;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -103,16 +109,33 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
private final ClockService clockService;
+ private final Function<Integer, CompletableFuture<Set<String>>>
currentDataNodesProvider;
+
+ /** Resolver of zone id by table id (result may be {@code null}). */
+ private final Function<Integer, Integer> zoneIdByTableIdResolver;
+
+ private final NodeProperties nodeProperties;
+
/**
* Constructor.
*
* @param msManager Meta storage manager.
* @param clockService Clock service.
*/
- public LeaseTracker(MetaStorageManager msManager, ClusterNodeResolver
clusterNodeResolver, ClockService clockService) {
+ public LeaseTracker(
+ MetaStorageManager msManager,
+ ClusterNodeResolver clusterNodeResolver,
+ ClockService clockService,
+ Function<Integer, CompletableFuture<Set<String>>>
currentDataNodesProvider,
+ Function<Integer, Integer> zoneIdByTableIdResolver,
+ NodeProperties nodeProperties
+ ) {
this.msManager = msManager;
this.clusterNodeResolver = clusterNodeResolver;
this.clockService = clockService;
+ this.currentDataNodesProvider = currentDataNodesProvider;
+ this.zoneIdByTableIdResolver = zoneIdByTableIdResolver;
+ this.nodeProperties = nodeProperties;
}
/**
@@ -281,16 +304,37 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
TimeUnit unit
) {
return awaitPrimaryReplicaImpl(groupId, timestamp, System.nanoTime(),
unit.toNanos(timeout))
- .exceptionally(e -> {
- if (hasCause(e, TimeoutException.class)) {
- throw new PrimaryReplicaAwaitTimeoutException(groupId,
timestamp, leases.leaseByGroupId().get(groupId), e);
- } else if (hasCause(e, TrackerClosedException.class)) {
- // TrackerClosedException is thrown when trackers are
closed on node stop.
- throw new CompletionException(new
NodeStoppingException(e));
+ .handle((replicaMeta, e) -> {
+ if (e == null) {
+ return completedFuture(replicaMeta);
} else {
- throw new PrimaryReplicaAwaitException(groupId,
timestamp, e);
+ CompletableFuture<ReplicaMeta> failed = new
CompletableFuture<>();
+
+ if (hasCause(e, TimeoutException.class)) {
+ checkDataNodes(groupId)
+ .thenRun(() -> {
+ throw new
PrimaryReplicaAwaitTimeoutException(
+ groupId,
+ timestamp,
+
leases.leaseByGroupId().get(groupId),
+ e
+ );
+ })
+ .exceptionally(ex -> {
+ failed.completeExceptionally(ex);
+ return null;
+ });
+ } else if (hasCause(e, TrackerClosedException.class)) {
+ // TrackerClosedException is thrown when trackers
are closed on node stop.
+ failed.completeExceptionally(new
CompletionException(new NodeStoppingException(e)));
+ } else {
+ failed.completeExceptionally(new
PrimaryReplicaAwaitException(groupId, timestamp, e));
+ }
+
+ return failed;
}
- });
+ })
+ .thenCompose(identity());
}
private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaImpl(
@@ -321,6 +365,21 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
});
}
+ private CompletableFuture<Void> checkDataNodes(ReplicationGroupId groupId)
{
+ Integer zoneId = extractZoneIdFromGroupId(groupId,
nodeProperties.colocationEnabled(), zoneIdByTableIdResolver);
+
+ if (zoneId != null) {
+ return currentDataNodesProvider.apply(zoneId)
+ .thenAccept(dataNodes -> {
+ if (dataNodes.isEmpty()) {
+ throw new EmptyDataNodesException(zoneId);
+ }
+ });
+ } else {
+ return nullCompletedFuture();
+ }
+ }
+
private boolean isValidReplicaMeta(@Nullable ReplicaMeta replicaMeta) {
UUID leaseholderId = replicaMeta == null ? null :
replicaMeta.getLeaseholderId();
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 2a4441d4053..ef151987c1a 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -142,7 +142,9 @@ public class ActiveActorTest extends
AbstractTopologyAwareGroupServiceTest {
new SystemPropertiesNodeProperties(),
replicationConfiguration,
Runnable::run,
- mock(MetricManager.class)
+ mock(MetricManager.class),
+ zoneId -> completedFuture(Set.of()),
+ id -> null
);
assertThat(placementDriverManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 345d307b291..9594cf62364 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -199,12 +199,19 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
LeaseTracker leaseTracker = new LeaseTracker(
metaStorageManager,
pdClusterService.topologyService(),
- clockService
+ clockService,
+ zoneId -> completedFuture(Set.of()),
+ id -> null,
+ new SystemPropertiesNodeProperties()
);
leaseTracker.startTrack(0L);
- assignmentsTracker = new AssignmentsTracker(metaStorageManager,
mock(FailureProcessor.class), new SystemPropertiesNodeProperties());
+ assignmentsTracker = new AssignmentsTracker(
+ metaStorageManager,
+ mock(FailureProcessor.class),
+ new SystemPropertiesNodeProperties()
+ );
assignmentsTracker.startTrack();
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
index dacd273e2d0..5c7d4cb3969 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.Collections.synchronizedList;
import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED;
@@ -36,9 +37,11 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
@@ -82,7 +85,10 @@ public class LeaseTrackerTest extends BaseIgniteAbstractTest
{
leaseTracker = new LeaseTracker(
msManager,
clusterNodeResolver,
- new TestClockService(clock)
+ new TestClockService(clock),
+ zoneId -> completedFuture(Set.of()),
+ id -> null,
+ new SystemPropertiesNodeProperties()
);
assertThat(msManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 8eb8c0436bb..d71484b6d05 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -182,7 +182,11 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
return trueCompletedFuture();
});
- assignmentsTracker = new AssignmentsTracker(metaStorageManager,
mock(FailureProcessor.class), new SystemPropertiesNodeProperties());
+ assignmentsTracker = new AssignmentsTracker(
+ metaStorageManager,
+ mock(FailureProcessor.class),
+ new SystemPropertiesNodeProperties()
+ );
assignmentsTracker.startTrack();
leaseUpdater = new LeaseUpdater(
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 19474a33698..a06de408aaf 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -929,7 +930,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
}
private LeaseTracker createPlacementDriver() {
- return new LeaseTracker(metastore, new ClusterNodeResolver() {
+ var clusterNodeResolver = new ClusterNodeResolver() {
@Override
public @Nullable InternalClusterNode getByConsistentId(String
consistentId) {
return leaseholder;
@@ -939,10 +940,23 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
public @Nullable InternalClusterNode getById(UUID id) {
return leaseholder;
}
- }, clockService);
+ };
+
+ return new LeaseTracker(
+ metastore,
+ clusterNodeResolver,
+ clockService,
+ zoneId -> completedFuture(Set.of("A")),
+ id -> null,
+ new SystemPropertiesNodeProperties()
+ );
}
private AssignmentsTracker createAssignmentsPlacementDriver() {
- return new AssignmentsTracker(metastore, mock(FailureProcessor.class),
new SystemPropertiesNodeProperties());
+ return new AssignmentsTracker(
+ metastore,
+ mock(FailureProcessor.class),
+ new SystemPropertiesNodeProperties()
+ );
}
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index fa8433aa5b6..fb19e4d7638 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -153,6 +153,7 @@ enum class code : underlying_t {
// DistributionZones group. Group code: 10
ZONE_NOT_FOUND = 0xa0001,
+ EMPTY_DATA_NODES = 0xa0002,
// Network group. Group code: 11
UNRESOLVABLE_CONSISTENT_ID = 0xb0001,
@@ -207,6 +208,7 @@ enum class code : underlying_t {
// PlacementDriver group. Group code: 18
PRIMARY_REPLICA_AWAIT_TIMEOUT = 0x120001,
PRIMARY_REPLICA_AWAIT = 0x120002,
+ EMPTY_ASSIGNMENTS = 0x120003,
// CriticalWorkers group. Group code: 19
SYSTEM_WORKER_BLOCKED = 0x130001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 213e67f16d6..5ab079a3c9f 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -228,6 +228,7 @@ sql_state error_code_to_sql_state(error::code code) {
// DistributionZones group. Group code: 10
case error::code::ZONE_NOT_FOUND:
+ case error::code::EMPTY_DATA_NODES:
return sql_state::SHY000_GENERAL_ERROR;
// Network group. Group code: 11
@@ -290,6 +291,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::PRIMARY_REPLICA_AWAIT_TIMEOUT:
return sql_state::SHYT00_TIMEOUT_EXPIRED;
case error::code::PRIMARY_REPLICA_AWAIT:
+ case error::code::EMPTY_ASSIGNMENTS:
return sql_state::SHY000_GENERAL_ERROR;
// CriticalWorkers group. Group code: 19
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index cca00845de9..01442e162ed 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -433,6 +433,9 @@ namespace Apache.Ignite
/// <summary> ZoneNotFound error. </summary>
public const int ZoneNotFound = (GroupCode << 16) | (1 & 0xFFFF);
+
+ /// <summary> EmptyDataNodes error. </summary>
+ public const int EmptyDataNodes = (GroupCode << 16) | (2 & 0xFFFF);
}
/// <summary> Network errors. </summary>
@@ -652,6 +655,9 @@ namespace Apache.Ignite
/// <summary> PrimaryReplicaAwait error. </summary>
public const int PrimaryReplicaAwait = (GroupCode << 16) | (2 &
0xFFFF);
+
+ /// <summary> EmptyAssignments error. </summary>
+ public const int EmptyAssignments = (GroupCode << 16) | (3 &
0xFFFF);
}
/// <summary> CriticalWorkers errors. </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1e70034f487..4760067c5ab 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -595,7 +595,9 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
nodeProperties,
clusterConfigRegistry.getConfiguration(ReplicationExtensionConfiguration.KEY).replication(),
threadPoolsManager.commonScheduler(),
- metricManager
+ metricManager,
+ zoneId -> completedFuture(Set.of()),
+ id -> null
);
ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index cf511826f16..ad0c23571e0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -860,6 +861,16 @@ public class IgniteImpl implements Ignite {
var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
+ LongSupplier delayDurationMsSupplier =
delayDurationMsSupplier(schemaSyncConfig);
+
+ CatalogManagerImpl catalogManager = new CatalogManagerImpl(
+ new UpdateLogImpl(metaStorageMgr, failureManager),
+ clockService,
+ failureManager,
+ nodeProperties,
+ delayDurationMsSupplier
+ );
+
ReplicationConfiguration replicationConfig = clusterConfigRegistry
.getConfiguration(ReplicationExtensionConfiguration.KEY).replication();
@@ -877,7 +888,12 @@ public class IgniteImpl implements Ignite {
nodeProperties,
replicationConfig,
threadPoolsManager.commonScheduler(),
- metricManager
+ metricManager,
+ zoneId -> distributionZoneManager().currentDataNodes(zoneId),
+ tableId -> {
+ CatalogTableDescriptor table =
catalogManager.activeCatalog(clock.now().longValue()).table(tableId);
+ return table == null ? null : table.zoneId();
+ }
);
TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
@@ -953,16 +969,6 @@ public class IgniteImpl implements Ignite {
outgoingSnapshotsManager = new OutgoingSnapshotsManager(name,
clusterSvc.messagingService(), failureManager);
- LongSupplier delayDurationMsSupplier =
delayDurationMsSupplier(schemaSyncConfig);
-
- CatalogManagerImpl catalogManager = new CatalogManagerImpl(
- new UpdateLogImpl(metaStorageMgr, failureManager),
- clockService,
- failureManager,
- nodeProperties,
- delayDurationMsSupplier
- );
-
systemViewManager = new SystemViewManagerImpl(name, catalogManager,
failureManager);
nodeAttributesCollector.register(systemViewManager);
logicalTopology.addEventListener(systemViewManager);