This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fe43b03ed4 IGNITE-23770 Introduce a method to extract
ReplicationGroupId from Meta Storage keys (#4787)
fe43b03ed4 is described below
commit fe43b03ed4d7573a8ce761f7aa54e5b218d93dfc
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Nov 26 11:04:40 2024 +0200
IGNITE-23770 Introduce a method to extract ReplicationGroupId from Meta
Storage keys (#4787)
---
check-rules/spotbugs-excludes.xml | 6 +++
.../internal/replicator/TablePartitionId.java | 5 ++-
.../internal/replicator/ZonePartitionId.java | 5 ++-
.../apache/ignite/internal/util/StringUtils.java | 15 +++++++
.../distributionzones/DistributionZonesUtil.java | 6 ++-
.../rebalance/DistributionZoneRebalanceEngine.java | 4 +-
.../distributionzones/rebalance/RebalanceUtil.java | 44 +++++++-----------
.../rebalance/ZoneRebalanceUtil.java | 44 +++++++-----------
.../DistributionZoneCausalityDataNodesTest.java | 4 +-
.../PartitionReplicaLifecycleManager.java | 49 ++++++++++----------
.../placementdriver/AssignmentsTracker.java | 42 +++++++----------
.../storage/DistributedConfigurationStorage.java | 7 +--
.../rebalance/ItRebalanceDistributedTest.java | 10 ++---
.../internal/table/distributed/TableManager.java | 52 +++++++++++-----------
14 files changed, 142 insertions(+), 151 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index 9b92992ec2..018d2c904a 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -238,6 +238,12 @@
<Class
name="org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage$ReadOnlyScanCursor"/>
<Method name="next"/>
</Match>
+ <Match>
+ <!-- Public byte array constants, not expected to be modified. -->
+ <Bug pattern="MS_MUTABLE_ARRAY"/>
+ <Class
name="org.apache.ignite.internal.distributionzones.DistributionZonesUtil"/>
+ <Field name="DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES"/>
+ </Match>
<!-- end of false-positive exclusions -->
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
index 26a2518cc3..f9bdfa19ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
@@ -19,10 +19,13 @@ package org.apache.ignite.internal.replicator;
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be
refactored to ZonePartitionId.
+import java.util.regex.Pattern;
+
/**
* The class is used to identify a table replication group.
*/
public class TablePartitionId implements PartitionGroupId {
+ private static final Pattern DELIMITER_PATTERN = Pattern.compile("_part_");
/** Table id. */
private final int tableId;
@@ -48,7 +51,7 @@ public class TablePartitionId implements PartitionGroupId {
* @return An table partition id.
*/
public static TablePartitionId fromString(String str) {
- String[] parts = str.split("_part_");
+ String[] parts = DELIMITER_PATTERN.split(str);
return new TablePartitionId(Integer.parseInt(parts[0]),
Integer.parseInt(parts[1]));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java
index c794191bd2..7856e6d5df 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.replicator;
import java.util.Objects;
+import java.util.regex.Pattern;
/**
* The class is used to identify a zone replication group id for a given
partition.
*/
public class ZonePartitionId implements ReplicationGroupId {
+ private static final Pattern DELIMITER_PATTERN = Pattern.compile("_part_");
+
private final int zoneId;
private final int tableId;
@@ -90,7 +93,7 @@ public class ZonePartitionId implements ReplicationGroupId {
* @return An zone partition id.
*/
public static ZonePartitionId fromString(String str) {
- String[] parts = str.split("_part_");
+ String[] parts = DELIMITER_PATTERN.split(str);
return new ZonePartitionId(Integer.parseInt(parts[0]),
Integer.parseInt(parts[1]));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/StringUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StringUtils.java
index 8067a241b3..f0aebc6e4c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/StringUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/StringUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.util;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
@@ -189,4 +191,17 @@ public final class StringUtils {
public static String hexInt(int val) {
return new IgniteStringBuilder(8).appendHex(val).toString();
}
+
+ /**
+ * Converts the given byte array into a String, without the first {@code
prefixLength} bytes.
+ *
+ * @param bytes Byte array to convert.
+ * @param prefixLength The number of bytes to skip from the beginning of
the array.
+ * @return String representation of the byte array without the first
{@code prefixLength} bytes.
+ */
+ public static String toStringWithoutPrefix(byte[] bytes, int prefixLength)
{
+ assert prefixLength > 0 : "prefixLength must be greater than zero: " +
prefixLength;
+
+ return new String(bytes, prefixLength, bytes.length - prefixLength,
UTF_8);
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index e8fed7ed5b..166b441bdb 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -38,6 +38,7 @@ import static
org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -78,6 +79,9 @@ public class DistributionZonesUtil {
/** Key prefix for zone's data nodes. */
public static final String DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX =
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "value.";
+ public static final byte[] DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES
=
+
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.getBytes(StandardCharsets.UTF_8);
+
/** Key prefix for zone's scale up change trigger key. */
private static final String
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX =
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleUpChangeTrigger.";
@@ -164,7 +168,7 @@ public class DistributionZonesUtil {
* @return ByteArray representation.
*/
public static ByteArray zoneDataNodesKey() {
- return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
+ return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES);
}
/**
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index ae252da62d..964901fca2 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -20,7 +20,7 @@ package
org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.findTablesByZoneId;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
@@ -211,7 +211,7 @@ public class DistributionZoneRebalanceEngine {
return nullCompletedFuture();
}
- int zoneId =
extractZoneId(evt.entryEvent().newEntry().key(),
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
+ int zoneId =
extractZoneId(evt.entryEvent().newEntry().key(),
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES);
// It is safe to get the latest version of the catalog as
we are in the metastore thread.
// TODO: IGNITE-22723 Potentially unsafe to use the latest
catalog version, as the tables might not already present
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index a03f2e3b71..1991202b62 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
@@ -40,8 +41,8 @@ import static
org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.StringUtils.toStringWithoutPrefix;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -52,7 +53,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -77,9 +77,7 @@ public class RebalanceUtil {
private static final IgniteLogger LOG =
Loggers.forClass(RebalanceUtil.class);
/**
- * Status values for methods like
- * {@link #updatePendingAssignmentsKeys(CatalogTableDescriptor,
TablePartitionId, Collection, int, long, MetaStorageManager, int, Set,
- * HybridTimestamp)}.
+ * Status values for methods like {@link #updatePendingAssignmentsKeys}.
*/
public enum UpdateStatus {
/**
@@ -378,12 +376,18 @@ public class RebalanceUtil {
/** Key prefix for pending assignments. */
public static final String PENDING_ASSIGNMENTS_PREFIX =
"assignments.pending.";
+ public static final byte[] PENDING_ASSIGNMENTS_PREFIX_BYTES =
"assignments.pending.".getBytes(UTF_8);
+
/** Key prefix for stable assignments. */
public static final String STABLE_ASSIGNMENTS_PREFIX =
"assignments.stable.";
+ public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
+
/** Key prefix for switch reduce assignments. */
public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"assignments.switch.reduce.";
+ public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
+
/** Key prefix for switch append assignments. */
public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"assignments.switch.append.";
@@ -454,16 +458,16 @@ public class RebalanceUtil {
}
/**
- * Extract table id from a metastorage key of partition.
+ * Converts the given {@code key}, stripping it off the given {@code
prefix}, into a {@link TablePartitionId}.
*
- * @param key Key.
+ * @param key Metastorage key.
* @param prefix Key prefix.
- * @return Table id.
+ * @return {@link TablePartitionId} that was encoded in the key.
*/
- public static int extractTableId(byte[] key, String prefix) {
- String strKey = new String(key, StandardCharsets.UTF_8);
+ public static TablePartitionId extractTablePartitionId(byte[] key, byte[]
prefix) {
+ var tablePartitionIdString = toStringWithoutPrefix(key, prefix.length);
- return Integer.parseInt(strKey.substring(prefix.length(),
strKey.indexOf("_part_")));
+ return TablePartitionId.fromString(tablePartitionIdString);
}
/**
@@ -473,22 +477,8 @@ public class RebalanceUtil {
* @param prefix Key prefix.
* @return Table id.
*/
- public static int extractZoneId(byte[] key, String prefix) {
- String strKey = new String(key, StandardCharsets.UTF_8);
-
- return Integer.parseInt(strKey.substring(prefix.length()));
- }
-
- /**
- * Extract partition number from the rebalance key of partition.
- *
- * @param key Key.
- * @return Partition number.
- */
- public static int extractPartitionNumber(byte[] key) {
- var strKey = new String(key, StandardCharsets.UTF_8);
-
- return Integer.parseInt(strKey.substring(strKey.indexOf("_part_") +
"_part_".length()));
+ public static int extractZoneId(byte[] key, byte[] prefix) {
+ return Integer.parseInt(toStringWithoutPrefix(key, prefix.length));
}
/**
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 67ad4a9fcf..7e19b35f25 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -40,8 +41,8 @@ import static
org.apache.ignite.internal.partitiondistribution.PartitionDistribu
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static
org.apache.ignite.internal.util.StringUtils.toStringWithoutPrefix;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,7 +54,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -76,9 +76,7 @@ public class ZoneRebalanceUtil {
private static final IgniteLogger LOG =
Loggers.forClass(ZoneRebalanceUtil.class);
/**
- * Status values for methods like
- * {@link #updatePendingAssignmentsKeys(CatalogZoneDescriptor,
ZonePartitionId, Collection, int, long, MetaStorageManager, int, Set,
- * HybridTimestamp)}.
+ * Status values for methods like {@link #updatePendingAssignmentsKeys}.
*/
public enum UpdateStatus {
/**
@@ -358,15 +356,21 @@ public class ZoneRebalanceUtil {
/** Key prefix for pending assignments. */
public static final String PENDING_ASSIGNMENTS_PREFIX =
"zone.assignments.pending.";
+ public static final byte[] PENDING_ASSIGNMENTS_PREFIX_BYTES =
PENDING_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
+
/** Key prefix for stable assignments. */
public static final String STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
+ public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
+
/** Key prefix for planned assignments. */
public static final String PLANNED_ASSIGNMENTS_PREFIX =
"zone.assignments.planned.";
/** Key prefix for switch reduce assignments. */
public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"zone.assignments.switch.reduce.";
+ public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
+
/** Key prefix for switch append assignments. */
public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"zone.assignments.switch.append.";
@@ -437,16 +441,16 @@ public class ZoneRebalanceUtil {
}
/**
- * Extract zone id from a metastorage key of partition.
+ * Converts the given {@code key}, stripping it off the given {@code
prefix}, into a {@link ZonePartitionId}.
*
- * @param key Key.
+ * @param key Metastorage key.
* @param prefix Key prefix.
- * @return Table id.
+ * @return {@link ZonePartitionId} that was encoded in the key.
*/
- public static int extractZoneId(byte[] key, String prefix) {
- String strKey = new String(key, StandardCharsets.UTF_8);
+ public static ZonePartitionId extractZonePartitionId(byte[] key, byte[]
prefix) {
+ var zonePartitionIdString = toStringWithoutPrefix(key, prefix.length);
- return Integer.parseInt(strKey.substring(prefix.length(),
strKey.indexOf("_part_")));
+ return ZonePartitionId.fromString(zonePartitionIdString);
}
/**
@@ -456,21 +460,7 @@ public class ZoneRebalanceUtil {
* @return Table id.
*/
public static int extractZoneIdDataNodes(byte[] key) {
- String strKey = new String(key, StandardCharsets.UTF_8);
-
- return
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.length()));
- }
-
- /**
- * Extract partition number from the rebalance key of partition.
- *
- * @param key Key.
- * @return Partition number.
- */
- public static int extractPartitionNumber(byte[] key) {
- var strKey = new String(key, StandardCharsets.UTF_8);
-
- return Integer.parseInt(strKey.substring(strKey.indexOf("_part_") +
"_part_".length()));
+ return Integer.parseInt(toStringWithoutPrefix(key,
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES.length));
}
/**
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index ce55d8ebf6..612a00c142 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -30,7 +30,7 @@ import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP;
import static
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
@@ -1477,7 +1477,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
if (startsWith(e.key(), zoneDataNodesKey().bytes())) {
revision = e.revision();
- zoneId = extractZoneId(e.key(),
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
+ zoneId = extractZoneId(e.key(),
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX_BYTES);
byte[] dataNodesBytes = e.value();
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f67285246c..6999bf56fb 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -31,11 +31,10 @@ import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener.handleReduceChanged;
-import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractPartitionNumber;
-import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneId;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZonePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
@@ -266,9 +265,9 @@ public class PartitionReplicaLifecycleManager extends
CompletableFuture<Void> processZonesAndAssignmentsOnStart =
processZonesOnStart(recoveryRevision, lowWatermark.getLowWatermark())
.thenCompose(ignored ->
processAssignmentsOnRecovery(recoveryRevision));
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(PENDING_ASSIGNMENTS_PREFIX_BYTES),
pendingAssignmentsRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES), stableAssignmentsRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES),
assignmentsSwitchRebalanceListener);
catalogMgr.listen(ZONE_CREATE,
(CreateZoneEventParameters parameters) ->
@@ -317,8 +316,8 @@ public class PartitionReplicaLifecycleManager extends
}
private CompletableFuture<Void> processAssignmentsOnRecovery(long
recoveryRevision) {
- var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
- var pendingAssignmentsPrefix = new
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+ var stableAssignmentsPrefix = new
ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES);
+ var pendingAssignmentsPrefix = new
ByteArray(PENDING_ASSIGNMENTS_PREFIX_BYTES);
// It's required to handle stable assignments changes on recovery in
order to cleanup obsolete resources.
CompletableFuture<Void> stableFuture = handleAssignmentsOnRecovery(
@@ -761,10 +760,7 @@ public class PartitionReplicaLifecycleManager extends
return inBusyLockAsync(busyLock, () -> {
byte[] key = evt.entryEvent().newEntry().key();
- int partitionId = extractPartitionNumber(key);
- int zoneId = extractZoneId(key,
ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
-
- ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId,
partitionId);
+ ZonePartitionId replicaGrpId = extractZonePartitionId(key,
ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES);
Assignments assignments =
Assignments.fromBytes(evt.entryEvent().newEntry().value());
@@ -773,11 +769,11 @@ public class PartitionReplicaLifecycleManager extends
return
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused ->
inBusyLockAsync(busyLock, () -> {
int catalogVersion =
catalogMgr.activeCatalogVersion(assignmentsTimestamp);
- CatalogZoneDescriptor zoneDescriptor =
catalogMgr.zone(zoneId, catalogVersion);
+ CatalogZoneDescriptor zoneDescriptor =
catalogMgr.zone(replicaGrpId.zoneId(), catalogVersion);
long causalityToken = zoneDescriptor.updateToken();
- return distributionZoneMgr.dataNodes(causalityToken,
catalogVersion, zoneId)
+ return distributionZoneMgr.dataNodes(causalityToken,
catalogVersion, replicaGrpId.zoneId())
.thenCompose(dataNodes -> handleReduceChanged(
metaStorageMgr,
dataNodes,
@@ -842,10 +838,7 @@ public class PartitionReplicaLifecycleManager extends
long revision,
boolean isRecovery
) {
- int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
- int zoneId = extractZoneId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
-
- ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionId);
+ ZonePartitionId zonePartitionId =
extractZonePartitionId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX_BYTES);
Set<Assignment> stableAssignments =
stableAssignmentsWatchEvent.value() == null
? emptySet()
@@ -929,10 +922,7 @@ public class PartitionReplicaLifecycleManager extends
return nullCompletedFuture();
}
- int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
- int zoneId = extractZoneId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
-
- var zonePartitionId = new ZonePartitionId(zoneId, partId);
+ ZonePartitionId zonePartitionId =
extractZonePartitionId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX_BYTES);
// Stable assignments from the meta store, which revision is bounded
by the current pending event.
Assignments stableAssignments = stableAssignments(zonePartitionId,
revision);
@@ -947,9 +937,16 @@ public class PartitionReplicaLifecycleManager extends
if (LOG.isInfoEnabled()) {
var stringKey = new String(pendingAssignmentsEntry.key(),
UTF_8);
- LOG.info("Received update on pending assignments. Check if new
replication node should be started [key={}, "
+ LOG.info(
+ "Received update on pending assignments. Check if new
replication node should be started [key={}, "
+ "partition={}, zoneId={},
localMemberAddress={}, pendingAssignments={}, revision={}]",
- stringKey, partId, zoneId, localNode().address(),
pendingAssignments, revision);
+ stringKey,
+ zonePartitionId.partitionId(),
+ zonePartitionId.zoneId(),
+ localNode().address(),
+ pendingAssignments,
+ revision
+ );
}
return handleChangePendingAssignmentEvent(
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 54f2941485..635987258b 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.placementdriver;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,7 +46,6 @@ import
org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -96,12 +96,12 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
* Restores assignments form Vault and subscribers on further updates.
*/
public void startTrack() {
-
msManager.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsListener);
-
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsListener);
+ msManager.registerPrefixWatch(new
ByteArray(PENDING_ASSIGNMENTS_PREFIX_BYTES), pendingAssignmentsListener);
+ msManager.registerPrefixWatch(new
ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES), stableAssignmentsListener);
msManager.recoveryFinishedFuture().thenAccept(recoveryRevisions -> {
- handleRecoveryAssignments(recoveryRevisions,
PENDING_ASSIGNMENTS_PREFIX, groupPendingAssignments);
- handleRecoveryAssignments(recoveryRevisions,
STABLE_ASSIGNMENTS_PREFIX, groupStableAssignments);
+ handleRecoveryAssignments(recoveryRevisions,
PENDING_ASSIGNMENTS_PREFIX_BYTES, groupPendingAssignments);
+ handleRecoveryAssignments(recoveryRevisions,
STABLE_ASSIGNMENTS_PREFIX_BYTES, groupStableAssignments);
}).whenComplete((res, ex) -> {
if (ex != null) {
LOG.error("Cannot do recovery", ex);
@@ -166,7 +166,7 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
LOG.debug("Stable assignments update [revision={},
keys={}]", event.revision(), collectKeysFromEventAsString(event));
}
- handleReceivedAssignments(event, STABLE_ASSIGNMENTS_PREFIX,
groupStableAssignments);
+ handleReceivedAssignments(event,
STABLE_ASSIGNMENTS_PREFIX_BYTES, groupStableAssignments);
return nullCompletedFuture();
}
@@ -185,7 +185,7 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
LOG.debug("Pending assignments update [revision={},
keys={}]", event.revision(), collectKeysFromEventAsString(event));
}
- handleReceivedAssignments(event, PENDING_ASSIGNMENTS_PREFIX,
groupPendingAssignments);
+ handleReceivedAssignments(event,
PENDING_ASSIGNMENTS_PREFIX_BYTES, groupPendingAssignments);
return nullCompletedFuture();
}
@@ -198,13 +198,13 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
private static void handleReceivedAssignments(
WatchEvent event,
- String assignmentsMetastoreKeyPrefix,
+ byte[] assignmentsMetastoreKeyPrefix,
Map<ReplicationGroupId, TokenizedAssignments>
groupIdToAssignmentsMap
) {
for (EntryEvent evt : event.entryEvents()) {
Entry entry = evt.newEntry();
- ReplicationGroupId grpId =
extractGroupFromEventEntryByPrefix(entry, assignmentsMetastoreKeyPrefix);
+ ReplicationGroupId grpId = extractTablePartitionId(entry.key(),
assignmentsMetastoreKeyPrefix);
if (entry.tombstone()) {
groupIdToAssignmentsMap.remove(grpId);
@@ -216,11 +216,12 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
private void handleRecoveryAssignments(
Revisions recoveryRevisions,
- String assignmentsMetastoreKeyPrefix,
+ byte[] assignmentsMetastoreKeyPrefix,
Map<ReplicationGroupId, TokenizedAssignments>
groupIdToAssignmentsMap
) {
- ByteArray startKey =
ByteArray.fromString(assignmentsMetastoreKeyPrefix);
- ByteArray endKey =
ByteArray.fromString(incrementLastChar(assignmentsMetastoreKeyPrefix));
+ var startKey = new ByteArray(assignmentsMetastoreKeyPrefix);
+ // FIXME: Remove intermediate string conversion, see
https://issues.apache.org/jira/browse/IGNITE-23771
+ ByteArray endKey = ByteArray.fromString(incrementLastChar(new
String(assignmentsMetastoreKeyPrefix, UTF_8)));
long revision = recoveryRevisions.revision();
try (Cursor<Entry> cursor = msManager.getLocally(startKey, endKey,
revision)) {
@@ -229,7 +230,7 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
continue;
}
- ReplicationGroupId grpId =
extractGroupFromEventEntryByPrefix(entry, assignmentsMetastoreKeyPrefix);
+ ReplicationGroupId grpId =
extractTablePartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry);
}
@@ -251,15 +252,6 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
groupIdToAssignmentsMap.put(grpId, new
TokenizedAssignmentsImpl(assignmentNodes, entry.revision()));
}
- private static ReplicationGroupId extractGroupFromEventEntryByPrefix(
- Entry entry, String assignmentsMetastoreKeyPrefix
- ) {
- var replicationGroupIdString = new String(entry.key(),
StandardCharsets.UTF_8)
- .replace(assignmentsMetastoreKeyPrefix, "");
-
- return TablePartitionId.fromString(replicationGroupIdString);
- }
-
private static String collectKeysFromEventAsString(WatchEvent event) {
return event.entryEvents().stream()
.map(e -> new ByteArray(e.newEntry().key()).toString())
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index d7b790d217..23e38efd69 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static
org.apache.ignite.internal.util.StringUtils.toStringWithoutPrefix;
import java.io.Serializable;
import java.util.ArrayList;
@@ -210,11 +211,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
continue;
}
- int startIdx = DST_KEYS_START_RANGE.length();
-
- int keyLengthWithoutPrefix = key.length - startIdx;
-
- var dataKey = new String(key, startIdx,
keyLengthWithoutPrefix, UTF_8);
+ String dataKey = toStringWithoutPrefix(key,
DST_KEYS_START_RANGE.length());
data.put(dataKey,
ConfigurationSerializationUtil.fromBytes(value));
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 613da6537e..13a742f77b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -28,9 +28,8 @@ import static
org.apache.ignite.internal.configuration.IgnitePaths.cmgPath;
import static
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePath;
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTableId;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
@@ -1608,10 +1607,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
return null;
}
- int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
- int tableId = extractTableId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
-
- return new TablePartitionId(tableId, partitionId);
+ return extractTablePartitionId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX_BYTES);
}
TablePartitionId getTablePartitionId(String tableName, int
partitionId) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1d54de6244..5dba90e2f1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -30,11 +30,10 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTableId;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -618,9 +617,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
processAssignmentsOnRecovery(recoveryRevision);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(PENDING_ASSIGNMENTS_PREFIX_BYTES),
pendingAssignmentsRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES), stableAssignmentsRebalanceListener);
+ metaStorageMgr.registerPrefixWatch(new
ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES),
assignmentsSwitchRebalanceListener);
catalogService.listen(CatalogEvent.TABLE_CREATE, parameters ->
onTableCreate((CreateTableEventParameters) parameters));
catalogService.listen(CatalogEvent.TABLE_CREATE, parameters ->
@@ -874,8 +873,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private void processAssignmentsOnRecovery(long recoveryRevision) {
- var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
- var pendingAssignmentsPrefix = new
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+ var stableAssignmentsPrefix = new
ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES);
+ var pendingAssignmentsPrefix = new
ByteArray(PENDING_ASSIGNMENTS_PREFIX_BYTES);
startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
stableAssignmentsPrefix,
@@ -2052,10 +2051,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return nullCompletedFuture();
}
- int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
- int tblId = extractTableId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
-
- var replicaGrpId = new TablePartitionId(tblId, partId);
+ TablePartitionId replicaGrpId =
extractTablePartitionId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX_BYTES);
// Stable assignments from the meta store, which revision is bounded
by the current pending event.
Assignments stableAssignments = stableAssignments(replicaGrpId,
revision);
@@ -2069,13 +2065,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
try {
- TableImpl table = tables.get(tblId);
+ TableImpl table = tables.get(replicaGrpId.tableId());
// Table can be null only recovery, because we use a
revision from the future. See comment inside
// performRebalanceOnRecovery.
if (table == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("Skipping Pending Assignments update,
because table {} does not exist", tblId);
+ LOG.info("Skipping Pending Assignments update,
because table {} does not exist", replicaGrpId.tableId());
}
return
CompletableFutures.<Void>nullCompletedFuture();
@@ -2084,9 +2080,16 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
if (LOG.isInfoEnabled()) {
var stringKey = new
String(pendingAssignmentsEntry.key(), UTF_8);
- LOG.info("Received update on pending assignments.
Check if new raft group should be started [key={}, "
+ LOG.info(
+ "Received update on pending assignments.
Check if new raft group should be started [key={}, "
+ "partition={}, table={},
localMemberAddress={}, pendingAssignments={}, revision={}]",
- stringKey, partId, table.name(),
localNode().address(), pendingAssignments, revision);
+ stringKey,
+ replicaGrpId.partitionId(),
+ table.name(),
+ localNode().address(),
+ pendingAssignments,
+ revision
+ );
}
long assignmentsTimestamp =
pendingAssignments.timestamp();
@@ -2392,10 +2395,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return inBusyLockAsync(busyLock, () -> {
byte[] key = evt.entryEvent().newEntry().key();
- int partitionId = extractPartitionNumber(key);
- int tableId = extractTableId(key,
ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
-
- TablePartitionId replicaGrpId = new
TablePartitionId(tableId, partitionId);
+ TablePartitionId replicaGrpId =
extractTablePartitionId(key, ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES);
return tablesById(evt.revision())
.thenCompose(tables -> inBusyLockAsync(busyLock,
() -> {
@@ -2407,7 +2407,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.thenCompose(unused ->
inBusyLockAsync(busyLock, () -> {
int catalogVersion =
catalogService.activeCatalogVersion(assignmentsTimestamp);
- CatalogTableDescriptor
tableDescriptor = getTableDescriptor(tableId, catalogVersion);
+ CatalogTableDescriptor
tableDescriptor =
+
getTableDescriptor(replicaGrpId.tableId(), catalogVersion);
CatalogZoneDescriptor
zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion);
@@ -2527,10 +2528,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
long revision,
boolean isRecovery
) {
- int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
- int tableId = extractTableId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
-
- TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partitionId);
+ TablePartitionId tablePartitionId =
extractTablePartitionId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX_BYTES);
Set<Assignment> stableAssignments =
stableAssignmentsWatchEvent.value() == null
? emptySet()