This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d5d8dde793 IGNITE-20317 Return metastorage invokes for zones changes
in handlers, immediately recalculate data nodes when scale up/down is
immediate. (#2685)
d5d8dde793 is described below
commit d5d8dde7939c66637be66e6b62966fc9429de068
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Oct 16 11:11:01 2023 +0400
IGNITE-20317 Return metastorage invokes for zones changes in handlers,
immediately recalculate data nodes when scale up/down is immediate. (#2685)
---
.../distributionzones/DistributionZoneManager.java | 144 ++++++++++++---------
.../rebalance/DistributionZoneRebalanceEngine.java | 128 +++++++++---------
.../DistributionZoneCausalityDataNodesTest.java | 2 -
3 files changed, 149 insertions(+), 125 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index a65a13eb16..ae1e59ef3b 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -24,6 +24,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
@@ -150,17 +151,17 @@ public class DistributionZoneManager implements
IgniteComponent {
private final LogicalTopologyEventListener topologyEventListener = new
LogicalTopologyEventListener() {
@Override
public void onNodeJoined(LogicalNode joinedNode,
LogicalTopologySnapshot newTopology) {
- updateLogicalTopologyInMetaStorage(newTopology, false);
+ updateLogicalTopologyInMetaStorage(newTopology);
}
@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
- updateLogicalTopologyInMetaStorage(newTopology, false);
+ updateLogicalTopologyInMetaStorage(newTopology);
}
@Override
public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
- updateLogicalTopologyInMetaStorage(newTopology, true);
+ updateLogicalTopologyInMetaStorage(newTopology);
}
};
@@ -295,6 +296,14 @@ public class DistributionZoneManager implements
IgniteComponent {
long causalityToken = parameters.causalityToken();
+ if (newScaleUp == IMMEDIATE_TIMER_VALUE) {
+ return saveDataNodesToMetaStorageOnScaleUp(zoneId,
causalityToken).thenRun(() -> {
+ // TODO: causalityOnUpdateScaleUp will be removed
https://issues.apache.org/jira/browse/IGNITE-20604,
+ // catalog must be used instead
+
causalityDataNodesEngine.causalityOnUpdateScaleUp(causalityToken, zoneId,
IMMEDIATE_TIMER_VALUE);
+ });
+ }
+
// It is safe to zonesTimers.get(zoneId) in term of NPE because meta
storage notifications are one-threaded
// and this map will be initialized on a manager start or with catalog
notification
ZoneState zoneState = zonesState.get(zoneId);
@@ -329,6 +338,14 @@ public class DistributionZoneManager implements
IgniteComponent {
long causalityToken = parameters.causalityToken();
+ if (newScaleDown == IMMEDIATE_TIMER_VALUE) {
+ return saveDataNodesToMetaStorageOnScaleDown(zoneId,
causalityToken).thenRun(() -> {
+ // TODO: causalityOnUpdateScaleDown will be removed
https://issues.apache.org/jira/browse/IGNITE-20604,
+ // catalog must be used instead
+
causalityDataNodesEngine.causalityOnUpdateScaleDown(causalityToken, zoneId,
IMMEDIATE_TIMER_VALUE);
+ });
+ }
+
// It is safe to zonesTimers.get(zoneId) in term of NPE because meta
storage notifications are one-threaded
// and this map will be initialized on a manager start or with catalog
notification
ZoneState zoneState = zonesState.get(zoneId);
@@ -387,8 +404,9 @@ public class DistributionZoneManager implements
IgniteComponent {
*
* @param zone Zone descriptor.
* @param causalityToken Causality token.
+ * @return Future reflecting the completion of creation or restoring a
zone.
*/
- private void createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long
causalityToken) {
+ private CompletableFuture<Void>
createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
int zoneId = zone.id();
VaultEntry topologyAugmentationMapFromVault =
vaultMgr.get(zoneTopologyAugmentationVault(zoneId)).join();
@@ -403,7 +421,8 @@ public class DistributionZoneManager implements
IgniteComponent {
Set<Node> dataNodes =
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
- initDataNodesAndTriggerKeysInMetaStorage(zoneId, causalityToken,
dataNodes);
+ return initDataNodesAndTriggerKeysInMetaStorage(zoneId,
causalityToken, dataNodes)
+ .thenRun(() ->
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone));
} else {
// Restart case, when topologyAugmentationMap has already been
saved during a cluster work.
ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap
= fromBytes(topologyAugmentationMapFromVault.value());
@@ -424,6 +443,8 @@ public class DistributionZoneManager implements
IgniteComponent {
}
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken,
zone);
+
+ return completedFuture(null);
}
/**
@@ -493,8 +514,9 @@ public class DistributionZoneManager implements
IgniteComponent {
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
* @param dataNodes Data nodes.
+ * @return Future reflecting the completion of initialisation of zone's
keys in meta storage.
*/
- private void initDataNodesAndTriggerKeysInMetaStorage(
+ private CompletableFuture<Void> initDataNodesAndTriggerKeysInMetaStorage(
int zoneId,
long revision,
Set<Node> dataNodes
@@ -512,27 +534,32 @@ public class DistributionZoneManager implements
IgniteComponent {
Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
- metaStorageManager.invoke(iif).whenComplete((res, e) -> {
- if (e != null) {
- LOG.error(
- "Failed to update zones' dataNodes value [zoneId =
{}, dataNodes = {}, revision = {}]",
- e,
- zoneId,
- dataNodes,
- revision
- );
- } else if (res.getAsBoolean()) {
- LOG.info("Update zones' dataNodes value [zoneId = {},
dataNodes = {}, revision = {}]",
- zoneId, dataNodes, revision);
- } else {
- LOG.debug(
- "Failed to update zones' dataNodes value [zoneId =
{}, dataNodes = {}, revision = {}]",
- zoneId,
- dataNodes,
- revision
- );
- }
- });
+ return metaStorageManager.invoke(iif)
+ .thenApply(StatementResult::getAsBoolean)
+ .whenComplete((invokeResult, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Failed to update zones' dataNodes value
[zoneId = {}, dataNodes = {}, revision = {}]",
+ e,
+ zoneId,
+ dataNodes,
+ revision
+ );
+ } else if (invokeResult) {
+ LOG.info("Update zones' dataNodes value [zoneId =
{}, dataNodes = {}, revision = {}]",
+ zoneId,
+ dataNodes,
+ revision
+ );
+ } else {
+ LOG.debug(
+ "Failed to update zones' dataNodes value
[zoneId = {}, dataNodes = {}, revision = {}]",
+ zoneId,
+ dataNodes,
+ revision
+ );
+ }
+ }).thenCompose((ignored) -> completedFuture(null));
} finally {
busyLock.leaveBusy();
}
@@ -544,7 +571,7 @@ public class DistributionZoneManager implements
IgniteComponent {
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
*/
- private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
+ private CompletableFuture<Void> removeTriggerKeysAndDataNodes(int zoneId,
long revision) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -556,20 +583,23 @@ public class DistributionZoneManager implements
IgniteComponent {
Iif iif = iif(triggerKeyCondition, removeKeysUpd,
ops().yield(false));
- metaStorageManager.invoke(iif).whenComplete((res, e) -> {
- if (e != null) {
- LOG.error(
- "Failed to delete zone's dataNodes keys [zoneId =
{}, revision = {}]",
- e,
- zoneId,
- revision
- );
- } else if (res.getAsBoolean()) {
- LOG.info("Delete zone's dataNodes keys [zoneId = {},
revision = {}]", zoneId, revision);
- } else {
- LOG.debug("Failed to delete zone's dataNodes keys [zoneId
= {}, revision = {}]", zoneId, revision);
- }
- });
+ return metaStorageManager.invoke(iif)
+ .thenApply(StatementResult::getAsBoolean)
+ .whenComplete((invokeResult, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Failed to delete zone's dataNodes keys
[zoneId = {}, revision = {}]",
+ e,
+ zoneId,
+ revision
+ );
+ } else if (invokeResult) {
+ LOG.info("Delete zone's dataNodes keys [zoneId =
{}, revision = {}]", zoneId, revision);
+ } else {
+ LOG.debug("Failed to delete zone's dataNodes keys
[zoneId = {}, revision = {}]", zoneId, revision);
+ }
+ })
+ .thenCompose(ignored -> completedFuture(null));
} finally {
busyLock.leaveBusy();
}
@@ -580,10 +610,8 @@ public class DistributionZoneManager implements
IgniteComponent {
* in meta storage.
*
* @param newTopology Logical topology snapshot.
- * @param topologyLeap Flag that indicates whether this updates was
trigger by
- * {@link
LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
*/
- private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot
newTopology, boolean topologyLeap) {
+ private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot
newTopology) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -597,12 +625,7 @@ public class DistributionZoneManager implements
IgniteComponent {
// Very first start of the cluster, so we just initialize
zonesLogicalTopologyVersionKey
updateCondition = notExists(zonesLogicalTopologyVersionKey());
} else {
- if (topologyLeap) {
- updateCondition =
value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
- } else {
- // This condition may be stronger, as far as we receive
topology events one by one.
- updateCondition =
value(zonesLogicalTopologyVersionKey()).eq(longToBytes(newTopology.version() -
1));
- }
+ updateCondition =
value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
}
Iif iff = iif(
@@ -1358,17 +1381,14 @@ public class DistributionZoneManager implements
IgniteComponent {
CreateZoneEventParameters params = (CreateZoneEventParameters)
parameters;
- createOrRestoreZoneStateBusy(params.zoneDescriptor(),
params.causalityToken());
-
- return completedFuture(false);
+ return createOrRestoreZoneStateBusy(params.zoneDescriptor(),
params.causalityToken())
+ .thenCompose((ignored) -> completedFuture(false));
}));
catalogManager.listen(ZONE_DROP, (parameters, exception) ->
inBusyLock(busyLock, () -> {
assert exception == null : parameters;
- onDropZoneBusy((DropZoneEventParameters) parameters);
-
- return completedFuture(false);
+ return onDropZoneBusy((DropZoneEventParameters)
parameters).thenCompose((ignored) -> completedFuture(false));
}));
catalogManager.listen(ZONE_ALTER, new
ManagerCatalogAlterZoneEventListener());
@@ -1409,7 +1429,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}
}
- private void onDropZoneBusy(DropZoneEventParameters parameters) {
+ private CompletableFuture<Void> onDropZoneBusy(DropZoneEventParameters
parameters) {
int zoneId = parameters.zoneId();
long causalityToken = parameters.causalityToken();
@@ -1418,10 +1438,10 @@ public class DistributionZoneManager implements
IgniteComponent {
zoneState.stopTimers();
- removeTriggerKeysAndDataNodes(zoneId, causalityToken);
+ return removeTriggerKeysAndDataNodes(zoneId,
causalityToken).thenRun(() -> {
+ causalityDataNodesEngine.onDelete(causalityToken, zoneId);
- causalityDataNodesEngine.onDelete(causalityToken, zoneId);
-
- zonesState.remove(zoneId);
+ zonesState.remove(zoneId);
+ });
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index b0f6533c0f..bbff27b34c 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -104,7 +104,7 @@ public class DistributionZoneRebalanceEngine {
catalogService.listen(ZONE_ALTER, new
CatalogAlterZoneEventListener(catalogService) {
@Override
protected CompletableFuture<Void>
onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
- return onUpdateReplicas(parameters, oldReplicas);
+ return onUpdateReplicas(parameters);
}
});
@@ -158,48 +158,14 @@ public class DistributionZoneRebalanceEngine {
return completedFuture(null);
}
- for (CatalogTableDescriptor tableDescriptor :
findTablesByZoneId(zoneId, catalogVersion)) {
- CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerAllTablePartitionsRebalance(
- tableDescriptor,
- zoneDescriptor,
- filteredDataNodes,
- evt.entryEvent().newEntry().revision(),
- metaStorageManager
- );
+ List<CatalogTableDescriptor> tableDescriptors =
findTablesByZoneId(zoneId, catalogVersion);
- // This set is used to deduplicate exceptions (if
there is an exception from upstream, for instance,
- // when reading from MetaStorage, it will be
encountered by every partition future) to avoid noise
- // in the logs.
- Set<Throwable> unwrappedCauses =
ConcurrentHashMap.newKeySet();
-
- for (int partId = 0; partId < partitionFutures.length;
partId++) {
- int finalPartId = partId;
-
- partitionFutures[partId].exceptionally(e -> {
- Throwable cause =
ExceptionUtils.unwrapCause(e);
-
- if (unwrappedCauses.add(cause)) {
- // The exception is specific to this
partition.
- LOG.error(
- "Exception on updating assignments
for [table={}, partition={}]",
- e,
- tableInfo(tableDescriptor),
finalPartId
- );
- } else {
- // The exception is from upstream and not
specific for this partition, so don't log the partition index.
- LOG.error(
- "Exception on updating assignments
for [table={}]",
- e,
- tableInfo(tableDescriptor)
- );
- }
-
- return null;
- });
- }
- }
-
- return completedFuture(null);
+ return triggerPartitionsRebalanceForAllTables(
+ evt.entryEvent().newEntry().revision(),
+ zoneDescriptor,
+ filteredDataNodes,
+ tableDescriptors
+ );
});
}
@@ -210,7 +176,7 @@ public class DistributionZoneRebalanceEngine {
};
}
- private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters
parameters, int oldReplicas) {
+ private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters
parameters) {
return IgniteUtils.inBusyLockAsync(busyLock, () -> {
int zoneId = parameters.zoneDescriptor().id();
long causalityToken = parameters.causalityToken();
@@ -223,28 +189,68 @@ public class DistributionZoneRebalanceEngine {
List<CatalogTableDescriptor> tableDescriptors =
findTablesByZoneId(zoneId, parameters.catalogVersion());
- List<CompletableFuture<?>> tableFutures = new
ArrayList<>(tableDescriptors.size());
+ return triggerPartitionsRebalanceForAllTables(
+ causalityToken,
+ parameters.zoneDescriptor(),
+ dataNodes,
+ tableDescriptors
+ );
+ });
+ });
+ }
- for (CatalogTableDescriptor tableDescriptor :
tableDescriptors) {
- LOG.info(
- "Received update for replicas number
[table={}, oldNumber={}, newNumber={}]",
- tableInfo(tableDescriptor), oldReplicas,
parameters.zoneDescriptor().replicas()
- );
+ private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables(
+ long revision,
+ CatalogZoneDescriptor zoneDescriptor,
+ Set<String> dataNodes,
+ List<CatalogTableDescriptor> tableDescriptors
+ ) {
+ List<CompletableFuture<?>> tableFutures = new
ArrayList<>(tableDescriptors.size());
+
+ for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
+ CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerAllTablePartitionsRebalance(
+ tableDescriptor,
+ zoneDescriptor,
+ dataNodes,
+ revision,
+ metaStorageManager
+ );
+
+ // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
+ // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
+ // in the logs.
+ Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+ for (int partId = 0; partId < partitionFutures.length; partId++) {
+ int finalPartId = partId;
+
+ partitionFutures[partId].exceptionally(e -> {
+ Throwable cause = ExceptionUtils.unwrapCause(e);
+
+ if (unwrappedCauses.add(cause)) {
+ // The exception is specific to this partition.
+ LOG.error(
+ "Exception on updating assignments for
[table={}, partition={}]",
+ e,
+ tableInfo(tableDescriptor), finalPartId
+ );
+ } else {
+ // The exception is from upstream and not specific for
this partition, so don't log the partition index.
+ LOG.error(
+ "Exception on updating assignments for
[table={}]",
+ e,
+ tableInfo(tableDescriptor)
+ );
+ }
- CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerAllTablePartitionsRebalance(
- tableDescriptor,
- parameters.zoneDescriptor(),
- dataNodes,
- causalityToken,
- metaStorageManager
- );
+ return null;
+ });
+ }
- tableFutures.add(allOf(partitionFutures));
- }
+ tableFutures.add(allOf(partitionFutures));
+ }
- return
allOf(tableFutures.toArray(CompletableFuture[]::new));
- });
- });
+ return allOf(tableFutures.toArray(CompletableFuture[]::new));
}
private List<CatalogTableDescriptor> findTablesByZoneId(int zoneId, int
catalogVersion) {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index ac1ea861fb..26fcaedb4c 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -576,8 +576,6 @@ public class DistributionZoneCausalityDataNodesTest extends
BaseDistributionZone
// Create logical topology with NODE_0.
topology.putNode(NODE_0);
- putNodeInLogicalTopologyAndGetRevision(NODE_0, ONE_NODE);
-
// Test steps.
// Create a zone.