rondagostino commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376334060
########## clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java: ########## @@ -19,7 +19,7 @@ /** * An exception that may indicate the client's metadata is out of date */ -public abstract class InvalidMetadataException extends RetriableException { +public class InvalidMetadataException extends RetriableException { Review Comment: Not sure why this is marked `abstract`. It's been like this practically since the inception of the project. I guess the intent was to never throw this specifically but to instead always throw a derived class. But then why are the constructors `public` instead of `protected`? I see no harm in this change. The other possibility is to make the constructors protected, of course -- or to leave it alone, though that keeps the inconsistency. Maybe @cmccabe has more historical context on this and can comment? ########## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ########## @@ -16,55 +16,166 @@ */ package org.apache.kafka.common; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -public class DirectoryId { +public class DirectoryId extends Uuid { /** - * A UUID that is used to identify new or unknown dir assignments. + * A DirectoryId that is used to identify new or unknown dir assignments. */ - public static final Uuid UNASSIGNED = new Uuid(0L, 0L); + public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L); /** - * A UUID that is used to represent unspecified offline dirs. + * A DirectoryId that is used to represent unspecified offline dirs. */ - public static final Uuid LOST = new Uuid(0L, 1L); + public static final DirectoryId LOST = new DirectoryId(0L, 1L); /** - * A UUID that is used to represent and unspecified log directory, + * A DirectoryId that is used to represent and unspecified log directory, * that is expected to have been previously selected to host an * associated replica. This contrasts with {@code UNASSIGNED_DIR}, * which is associated with (typically new) replicas that may not * yet have been placed in any log directory. */ - public static final Uuid MIGRATING = new Uuid(0L, 2L); + public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L); /** * The set of reserved UUIDs that will never be returned by the random method. */ - public static final Set<Uuid> RESERVED; + public static final Set<DirectoryId> RESERVED; static { - HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED); - // The first 100 UUIDs are reserved for future use. + HashSet<DirectoryId> reserved = new HashSet<>(); + // The first 100 DirectoryIds are reserved for future use. for (long i = 0L; i < 100L; i++) { - reserved.add(new Uuid(0L, i)); + reserved.add(new DirectoryId(0L, i)); } RESERVED = Collections.unmodifiableSet(reserved); } + /** + * Constructs a Directory ID from the underlying 128 bits, + * exactly as a {@link Uuid} is constructed. + */ + private DirectoryId(long mostSigBits, long leastSigBits) { + super(mostSigBits, leastSigBits); + } + + /** + * Creates a DirectoryId based on a base64 string encoding used in the toString() method. + */ + public static DirectoryId fromString(String str) { + return DirectoryId.fromUuid(Uuid.fromString(str)); + } + + /** + * Creates a DirectoryId based on a {@link Uuid}. + */ + public static DirectoryId fromUuid(Uuid uuid) { + return new DirectoryId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + } + /** * Static factory to generate a directory ID. * * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") */ - public static Uuid random() { + public static DirectoryId random() { Uuid uuid = Uuid.randomUuid(); while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { uuid = Uuid.randomUuid(); } - return uuid; + return DirectoryId.fromUuid(uuid); + } + + /** + * Convert a list of Uuid to an array of DirectoryId. + * + * @param list The input list + * @return The output array + */ + public static DirectoryId[] toArray(List<Uuid> list) { + if (list == null) return null; + DirectoryId[] array = new DirectoryId[list.size()]; + for (int i = 0; i < list.size(); i++) { + array[i] = DirectoryId.fromUuid(list.get(i)); + } + return array; + } + + /** + * Convert an array of DirectoryIds to a list of Uuid. + * + * @param array The input array + * @return The output list + */ + public static List<Uuid> toList(DirectoryId[] array) { + if (array == null) return null; + List<Uuid> list = new ArrayList<>(array.length); + list.addAll(Arrays.asList(array)); + return list; + } + + /** + * Calculate the new directory information based on an existing replica assignment. + * Replicas for which there already is a directory ID keep the same directory. + * All other replicas get {@link #UNASSIGNED}. + * @param currentReplicas The current replicas, represented by the broker IDs + * @param currentDirectories The current directory information + * @param newReplicas The new replica list + * @return The new directory list + * @throws IllegalArgumentException If currentReplicas and currentDirectories have different lengths, + * or if there are duplicate broker IDs in the replica lists + */ + public static List<Uuid> update(int[] currentReplicas, DirectoryId[] currentDirectories, List<Integer> newReplicas) { + if (currentReplicas == null) currentReplicas = new int[0]; + if (currentDirectories == null) currentDirectories = new DirectoryId[0]; + Map<Integer, Uuid> assignments = buildAssignmentMap(currentReplicas, currentDirectories); + List<Uuid> consolidated = new ArrayList<>(newReplicas.size()); + for (int i = 0; i < newReplicas.size(); i++) { + int newReplica = newReplicas.get(i); Review Comment: Best to do `for (int newReplica: newReplicas) {` in case `newReplicas` is something like a linked list that is O(n) for random access. ########## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ########## @@ -16,55 +16,166 @@ */ package org.apache.kafka.common; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -public class DirectoryId { +public class DirectoryId extends Uuid { /** - * A UUID that is used to identify new or unknown dir assignments. + * A DirectoryId that is used to identify new or unknown dir assignments. */ - public static final Uuid UNASSIGNED = new Uuid(0L, 0L); + public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L); /** - * A UUID that is used to represent unspecified offline dirs. + * A DirectoryId that is used to represent unspecified offline dirs. */ - public static final Uuid LOST = new Uuid(0L, 1L); + public static final DirectoryId LOST = new DirectoryId(0L, 1L); /** - * A UUID that is used to represent and unspecified log directory, + * A DirectoryId that is used to represent and unspecified log directory, * that is expected to have been previously selected to host an * associated replica. This contrasts with {@code UNASSIGNED_DIR}, * which is associated with (typically new) replicas that may not * yet have been placed in any log directory. */ - public static final Uuid MIGRATING = new Uuid(0L, 2L); + public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L); /** * The set of reserved UUIDs that will never be returned by the random method. */ - public static final Set<Uuid> RESERVED; + public static final Set<DirectoryId> RESERVED; static { - HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED); - // The first 100 UUIDs are reserved for future use. + HashSet<DirectoryId> reserved = new HashSet<>(); + // The first 100 DirectoryIds are reserved for future use. for (long i = 0L; i < 100L; i++) { - reserved.add(new Uuid(0L, i)); + reserved.add(new DirectoryId(0L, i)); } RESERVED = Collections.unmodifiableSet(reserved); } + /** + * Constructs a Directory ID from the underlying 128 bits, + * exactly as a {@link Uuid} is constructed. + */ + private DirectoryId(long mostSigBits, long leastSigBits) { + super(mostSigBits, leastSigBits); + } + + /** + * Creates a DirectoryId based on a base64 string encoding used in the toString() method. + */ + public static DirectoryId fromString(String str) { + return DirectoryId.fromUuid(Uuid.fromString(str)); + } + + /** + * Creates a DirectoryId based on a {@link Uuid}. + */ + public static DirectoryId fromUuid(Uuid uuid) { + return new DirectoryId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + } Review Comment: Noting that while there is no direct test to ensure that the most/least significant bits are copied correctly, this is tested indirectly via existing tests. ########## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ########## @@ -310,11 +337,22 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, short versio setLeaderRecoveryState(leaderRecoveryState.value()). setLeaderEpoch(leaderEpoch). setPartitionEpoch(partitionEpoch); - if (version > 0) { + if (options.metadataVersion().isElrSupported()) { record.setEligibleLeaderReplicas(Replicas.toList(elr)). setLastKnownELR(Replicas.toList(lastKnownElr)); } - return new ApiMessageAndVersion(record, version); + if (options.metadataVersion().isDirectoryAssignmentSupported()) { + record.setDirectories(DirectoryId.toList(directories)); + } else { + for (int i = 0; i < directories.length; i++) { Review Comment: Wondering if this is a `NullPointerException` waiting to happen since the current version of the PR allows `directories` to be null in some case as mentioned above? ########## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ########## @@ -100,8 +115,15 @@ public void testChangeRecordIsNoOp() { private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + private static MetadataVersion metadataVersionForDirAssignmentInfo() { + MetadataVersion metadataVersion = Mockito.spy(MetadataVersion.latest()); + when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true); + return metadataVersion; + } Review Comment: Can we add a `// TODO` to indicate that this is temporary and we should put the real MetadataVersion in here once we have it? ########## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ########## @@ -16,55 +16,166 @@ */ package org.apache.kafka.common; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -public class DirectoryId { +public class DirectoryId extends Uuid { /** - * A UUID that is used to identify new or unknown dir assignments. + * A DirectoryId that is used to identify new or unknown dir assignments. */ - public static final Uuid UNASSIGNED = new Uuid(0L, 0L); + public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L); /** - * A UUID that is used to represent unspecified offline dirs. + * A DirectoryId that is used to represent unspecified offline dirs. */ - public static final Uuid LOST = new Uuid(0L, 1L); + public static final DirectoryId LOST = new DirectoryId(0L, 1L); /** - * A UUID that is used to represent and unspecified log directory, + * A DirectoryId that is used to represent and unspecified log directory, * that is expected to have been previously selected to host an * associated replica. This contrasts with {@code UNASSIGNED_DIR}, * which is associated with (typically new) replicas that may not * yet have been placed in any log directory. */ - public static final Uuid MIGRATING = new Uuid(0L, 2L); + public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L); /** * The set of reserved UUIDs that will never be returned by the random method. */ - public static final Set<Uuid> RESERVED; + public static final Set<DirectoryId> RESERVED; static { - HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED); - // The first 100 UUIDs are reserved for future use. + HashSet<DirectoryId> reserved = new HashSet<>(); + // The first 100 DirectoryIds are reserved for future use. for (long i = 0L; i < 100L; i++) { - reserved.add(new Uuid(0L, i)); + reserved.add(new DirectoryId(0L, i)); } RESERVED = Collections.unmodifiableSet(reserved); } + /** + * Constructs a Directory ID from the underlying 128 bits, + * exactly as a {@link Uuid} is constructed. + */ + private DirectoryId(long mostSigBits, long leastSigBits) { + super(mostSigBits, leastSigBits); + } + + /** + * Creates a DirectoryId based on a base64 string encoding used in the toString() method. + */ + public static DirectoryId fromString(String str) { + return DirectoryId.fromUuid(Uuid.fromString(str)); + } + + /** + * Creates a DirectoryId based on a {@link Uuid}. + */ + public static DirectoryId fromUuid(Uuid uuid) { + return new DirectoryId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + } + /** * Static factory to generate a directory ID. * * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") */ - public static Uuid random() { + public static DirectoryId random() { Uuid uuid = Uuid.randomUuid(); while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { uuid = Uuid.randomUuid(); } - return uuid; + return DirectoryId.fromUuid(uuid); + } + + /** + * Convert a list of Uuid to an array of DirectoryId. + * + * @param list The input list + * @return The output array + */ + public static DirectoryId[] toArray(List<Uuid> list) { + if (list == null) return null; + DirectoryId[] array = new DirectoryId[list.size()]; + for (int i = 0; i < list.size(); i++) { + array[i] = DirectoryId.fromUuid(list.get(i)); + } + return array; + } + + /** + * Convert an array of DirectoryIds to a list of Uuid. + * + * @param array The input array + * @return The output list + */ + public static List<Uuid> toList(DirectoryId[] array) { + if (array == null) return null; + List<Uuid> list = new ArrayList<>(array.length); + list.addAll(Arrays.asList(array)); + return list; + } + + /** + * Calculate the new directory information based on an existing replica assignment. + * Replicas for which there already is a directory ID keep the same directory. + * All other replicas get {@link #UNASSIGNED}. + * @param currentReplicas The current replicas, represented by the broker IDs + * @param currentDirectories The current directory information + * @param newReplicas The new replica list + * @return The new directory list + * @throws IllegalArgumentException If currentReplicas and currentDirectories have different lengths, + * or if there are duplicate broker IDs in the replica lists + */ + public static List<Uuid> update(int[] currentReplicas, DirectoryId[] currentDirectories, List<Integer> newReplicas) { Review Comment: Can we have a better name than `update()` since this method is returning something new as opposed to updating something? Maybe `createDirectoriesFrom()` or similar? ########## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ########## @@ -49,12 +51,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.when; @Timeout(value = 40) public class PartitionChangeBuilderTest { private static Stream<Arguments> partitionChangeRecordVersions() { - return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version)); + return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1) + .filter(v -> v != PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION) // TODO test latest record version in KAFKA-15514 Review Comment: I believe we are going to close the current PR associated with KAFKA-15514, https://github.com/apache/kafka/pull/14516, which I assume means we should remove this filter and make the tests pass in this PR. ########## metadata/src/main/resources/common/metadata/PartitionRecord.json: ########## @@ -47,6 +47,8 @@ "about": "The eligible leader replicas of this partition." }, { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2, - "about": "The last known eligible leader replicas of this partition." } + "about": "The last known eligible leader replicas of this partition." }, + { "name": "Directories", "type": "[]uuid", "versions": "2+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."} Review Comment: Not sure, but wondering if this should be a tagged field like the ELR fields. I don't see any specific advantage to doing so -- we won't ever use this version unless all nodes in the cluster support it, in which case making it mandatory actually makes sense -- but figured I would mention it in case anybody had additional thoughts or context around the ELR choice to tag the new fields in version 1. ########## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ########## @@ -122,8 +132,13 @@ public PartitionRegistration build() { throw new IllegalStateException("You must set last known elr."); } + if (directories == null) { + directories = DirectoryId.unassignedArray(replicas.length); + } Review Comment: This makes sense to me -- set them as unassigned when they aren't specified. Note that this could be the case due to the feature not being enabled. However, I suspect we should we do the same thing in `public PartitionRegistration(PartitionRecord record)`, otherwise sometimes we can have `PartitionRegistration` records with non-null but "unassigned" directory IDs and sometimes we can have null. It feels to me that it would be easier to reason about if we always have something there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org