rondagostino commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376334060


##########
clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java:
##########
@@ -19,7 +19,7 @@
 /**
  * An exception that may indicate the client's metadata is out of date
  */
-public abstract class InvalidMetadataException extends RetriableException {
+public class InvalidMetadataException extends RetriableException {

Review Comment:
   Not sure why this is marked `abstract`.  It's been like this practically 
since the inception of the project.  I guess the intent was to never throw this 
specifically but to instead always throw a derived class.  But then why are the 
constructors `public` instead of `protected`?
   
   I see no harm in this change.  The other possibility is to make the 
constructors protected, of course -- or to leave it alone, though that keeps 
the inconsistency.
   
   Maybe @cmccabe has more historical context on this and can comment?   



##########
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##########
@@ -16,55 +16,166 @@
  */
 package org.apache.kafka.common;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class DirectoryId {
+public class DirectoryId extends Uuid {
 
     /**
-     * A UUID that is used to identify new or unknown dir assignments.
+     * A DirectoryId that is used to identify new or unknown dir assignments.
      */
-    public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+    public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L);
 
     /**
-     * A UUID that is used to represent unspecified offline dirs.
+     * A DirectoryId that is used to represent unspecified offline dirs.
      */
-    public static final Uuid LOST = new Uuid(0L, 1L);
+    public static final DirectoryId LOST = new DirectoryId(0L, 1L);
 
     /**
-     * A UUID that is used to represent and unspecified log directory,
+     * A DirectoryId that is used to represent and unspecified log directory,
      * that is expected to have been previously selected to host an
      * associated replica. This contrasts with {@code UNASSIGNED_DIR},
      * which is associated with (typically new) replicas that may not
      * yet have been placed in any log directory.
      */
-    public static final Uuid MIGRATING = new Uuid(0L, 2L);
+    public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L);
 
     /**
      * The set of reserved UUIDs that will never be returned by the random 
method.
      */
-    public static final Set<Uuid> RESERVED;
+    public static final Set<DirectoryId> RESERVED;
 
     static {
-        HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
-        // The first 100 UUIDs are reserved for future use.
+        HashSet<DirectoryId> reserved = new HashSet<>();
+        // The first 100 DirectoryIds are reserved for future use.
         for (long i = 0L; i < 100L; i++) {
-            reserved.add(new Uuid(0L, i));
+            reserved.add(new DirectoryId(0L, i));
         }
         RESERVED = Collections.unmodifiableSet(reserved);
     }
 
+    /**
+     * Constructs a Directory ID from the underlying 128 bits,
+     * exactly as a {@link Uuid} is constructed.
+     */
+    private DirectoryId(long mostSigBits, long leastSigBits) {
+        super(mostSigBits, leastSigBits);
+    }
+
+    /**
+     * Creates a DirectoryId based on a base64 string encoding used in the 
toString() method.
+     */
+    public static DirectoryId fromString(String str) {
+        return DirectoryId.fromUuid(Uuid.fromString(str));
+    }
+
+    /**
+     * Creates a DirectoryId based on a {@link Uuid}.
+     */
+    public static DirectoryId fromUuid(Uuid uuid) {
+        return new DirectoryId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+    }
+
     /**
      * Static factory to generate a directory ID.
      *
      * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
      */
-    public static Uuid random() {
+    public static DirectoryId random() {
         Uuid uuid = Uuid.randomUuid();
         while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
             uuid = Uuid.randomUuid();
         }
-        return uuid;
+        return DirectoryId.fromUuid(uuid);
+    }
+
+    /**
+     * Convert a list of Uuid to an array of DirectoryId.
+     *
+     * @param list          The input list
+     * @return              The output array
+     */
+    public static DirectoryId[] toArray(List<Uuid> list) {
+        if (list == null) return null;
+        DirectoryId[] array = new DirectoryId[list.size()];
+        for (int i = 0; i < list.size(); i++) {
+            array[i] = DirectoryId.fromUuid(list.get(i));
+        }
+        return array;
+    }
+
+    /**
+     * Convert an array of DirectoryIds to a list of Uuid.
+     *
+     * @param array         The input array
+     * @return              The output list
+     */
+    public static List<Uuid> toList(DirectoryId[] array) {
+        if (array == null) return null;
+        List<Uuid> list = new ArrayList<>(array.length);
+        list.addAll(Arrays.asList(array));
+        return list;
+    }
+
+    /**
+     * Calculate the new directory information based on an existing replica 
assignment.
+     * Replicas for which there already is a directory ID keep the same 
directory.
+     * All other replicas get {@link #UNASSIGNED}.
+     * @param currentReplicas               The current replicas, represented 
by the broker IDs
+     * @param currentDirectories            The current directory information
+     * @param newReplicas                   The new replica list
+     * @return                              The new directory list
+     * @throws IllegalArgumentException     If currentReplicas and 
currentDirectories have different lengths,
+     *                                      or if there are duplicate broker 
IDs in the replica lists
+     */
+    public static List<Uuid> update(int[] currentReplicas, DirectoryId[] 
currentDirectories, List<Integer> newReplicas) {
+        if (currentReplicas == null) currentReplicas = new int[0];
+        if (currentDirectories == null) currentDirectories = new 
DirectoryId[0];
+        Map<Integer, Uuid> assignments = buildAssignmentMap(currentReplicas, 
currentDirectories);
+        List<Uuid> consolidated = new ArrayList<>(newReplicas.size());
+        for (int i = 0; i < newReplicas.size(); i++) {
+            int newReplica = newReplicas.get(i);

Review Comment:
   Best to do `for (int newReplica: newReplicas) {` in case `newReplicas` is 
something like a linked list that is O(n) for random access.



##########
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##########
@@ -16,55 +16,166 @@
  */
 package org.apache.kafka.common;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class DirectoryId {
+public class DirectoryId extends Uuid {
 
     /**
-     * A UUID that is used to identify new or unknown dir assignments.
+     * A DirectoryId that is used to identify new or unknown dir assignments.
      */
-    public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+    public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L);
 
     /**
-     * A UUID that is used to represent unspecified offline dirs.
+     * A DirectoryId that is used to represent unspecified offline dirs.
      */
-    public static final Uuid LOST = new Uuid(0L, 1L);
+    public static final DirectoryId LOST = new DirectoryId(0L, 1L);
 
     /**
-     * A UUID that is used to represent and unspecified log directory,
+     * A DirectoryId that is used to represent and unspecified log directory,
      * that is expected to have been previously selected to host an
      * associated replica. This contrasts with {@code UNASSIGNED_DIR},
      * which is associated with (typically new) replicas that may not
      * yet have been placed in any log directory.
      */
-    public static final Uuid MIGRATING = new Uuid(0L, 2L);
+    public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L);
 
     /**
      * The set of reserved UUIDs that will never be returned by the random 
method.
      */
-    public static final Set<Uuid> RESERVED;
+    public static final Set<DirectoryId> RESERVED;
 
     static {
-        HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
-        // The first 100 UUIDs are reserved for future use.
+        HashSet<DirectoryId> reserved = new HashSet<>();
+        // The first 100 DirectoryIds are reserved for future use.
         for (long i = 0L; i < 100L; i++) {
-            reserved.add(new Uuid(0L, i));
+            reserved.add(new DirectoryId(0L, i));
         }
         RESERVED = Collections.unmodifiableSet(reserved);
     }
 
+    /**
+     * Constructs a Directory ID from the underlying 128 bits,
+     * exactly as a {@link Uuid} is constructed.
+     */
+    private DirectoryId(long mostSigBits, long leastSigBits) {
+        super(mostSigBits, leastSigBits);
+    }
+
+    /**
+     * Creates a DirectoryId based on a base64 string encoding used in the 
toString() method.
+     */
+    public static DirectoryId fromString(String str) {
+        return DirectoryId.fromUuid(Uuid.fromString(str));
+    }
+
+    /**
+     * Creates a DirectoryId based on a {@link Uuid}.
+     */
+    public static DirectoryId fromUuid(Uuid uuid) {
+        return new DirectoryId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+    }

Review Comment:
   Noting that while there is no direct test to ensure that the most/least 
significant bits are copied correctly, this is tested indirectly via existing 
tests.



##########
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##########
@@ -310,11 +337,22 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int 
partitionId, short versio
             setLeaderRecoveryState(leaderRecoveryState.value()).
             setLeaderEpoch(leaderEpoch).
             setPartitionEpoch(partitionEpoch);
-        if (version > 0) {
+        if (options.metadataVersion().isElrSupported()) {
             record.setEligibleLeaderReplicas(Replicas.toList(elr)).
                 setLastKnownELR(Replicas.toList(lastKnownElr));
         }
-        return new ApiMessageAndVersion(record, version);
+        if (options.metadataVersion().isDirectoryAssignmentSupported()) {
+            record.setDirectories(DirectoryId.toList(directories));
+        } else {
+            for (int i = 0; i < directories.length; i++) {

Review Comment:
   Wondering if this is a `NullPointerException` waiting to happen since the 
current version of the PR allows `directories` to be null in some case as 
mentioned above?



##########
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##########
@@ -100,8 +115,15 @@ public void testChangeRecordIsNoOp() {
 
     private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
+    private static MetadataVersion metadataVersionForDirAssignmentInfo() {
+        MetadataVersion metadataVersion = 
Mockito.spy(MetadataVersion.latest());
+        
when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
+        return metadataVersion;
+    }

Review Comment:
   Can we add a `// TODO` to indicate that this is temporary and we should put 
the real MetadataVersion in here once we have it?



##########
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##########
@@ -16,55 +16,166 @@
  */
 package org.apache.kafka.common;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class DirectoryId {
+public class DirectoryId extends Uuid {
 
     /**
-     * A UUID that is used to identify new or unknown dir assignments.
+     * A DirectoryId that is used to identify new or unknown dir assignments.
      */
-    public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+    public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L);
 
     /**
-     * A UUID that is used to represent unspecified offline dirs.
+     * A DirectoryId that is used to represent unspecified offline dirs.
      */
-    public static final Uuid LOST = new Uuid(0L, 1L);
+    public static final DirectoryId LOST = new DirectoryId(0L, 1L);
 
     /**
-     * A UUID that is used to represent and unspecified log directory,
+     * A DirectoryId that is used to represent and unspecified log directory,
      * that is expected to have been previously selected to host an
      * associated replica. This contrasts with {@code UNASSIGNED_DIR},
      * which is associated with (typically new) replicas that may not
      * yet have been placed in any log directory.
      */
-    public static final Uuid MIGRATING = new Uuid(0L, 2L);
+    public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L);
 
     /**
      * The set of reserved UUIDs that will never be returned by the random 
method.
      */
-    public static final Set<Uuid> RESERVED;
+    public static final Set<DirectoryId> RESERVED;
 
     static {
-        HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
-        // The first 100 UUIDs are reserved for future use.
+        HashSet<DirectoryId> reserved = new HashSet<>();
+        // The first 100 DirectoryIds are reserved for future use.
         for (long i = 0L; i < 100L; i++) {
-            reserved.add(new Uuid(0L, i));
+            reserved.add(new DirectoryId(0L, i));
         }
         RESERVED = Collections.unmodifiableSet(reserved);
     }
 
+    /**
+     * Constructs a Directory ID from the underlying 128 bits,
+     * exactly as a {@link Uuid} is constructed.
+     */
+    private DirectoryId(long mostSigBits, long leastSigBits) {
+        super(mostSigBits, leastSigBits);
+    }
+
+    /**
+     * Creates a DirectoryId based on a base64 string encoding used in the 
toString() method.
+     */
+    public static DirectoryId fromString(String str) {
+        return DirectoryId.fromUuid(Uuid.fromString(str));
+    }
+
+    /**
+     * Creates a DirectoryId based on a {@link Uuid}.
+     */
+    public static DirectoryId fromUuid(Uuid uuid) {
+        return new DirectoryId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+    }
+
     /**
      * Static factory to generate a directory ID.
      *
      * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
      */
-    public static Uuid random() {
+    public static DirectoryId random() {
         Uuid uuid = Uuid.randomUuid();
         while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
             uuid = Uuid.randomUuid();
         }
-        return uuid;
+        return DirectoryId.fromUuid(uuid);
+    }
+
+    /**
+     * Convert a list of Uuid to an array of DirectoryId.
+     *
+     * @param list          The input list
+     * @return              The output array
+     */
+    public static DirectoryId[] toArray(List<Uuid> list) {
+        if (list == null) return null;
+        DirectoryId[] array = new DirectoryId[list.size()];
+        for (int i = 0; i < list.size(); i++) {
+            array[i] = DirectoryId.fromUuid(list.get(i));
+        }
+        return array;
+    }
+
+    /**
+     * Convert an array of DirectoryIds to a list of Uuid.
+     *
+     * @param array         The input array
+     * @return              The output list
+     */
+    public static List<Uuid> toList(DirectoryId[] array) {
+        if (array == null) return null;
+        List<Uuid> list = new ArrayList<>(array.length);
+        list.addAll(Arrays.asList(array));
+        return list;
+    }
+
+    /**
+     * Calculate the new directory information based on an existing replica 
assignment.
+     * Replicas for which there already is a directory ID keep the same 
directory.
+     * All other replicas get {@link #UNASSIGNED}.
+     * @param currentReplicas               The current replicas, represented 
by the broker IDs
+     * @param currentDirectories            The current directory information
+     * @param newReplicas                   The new replica list
+     * @return                              The new directory list
+     * @throws IllegalArgumentException     If currentReplicas and 
currentDirectories have different lengths,
+     *                                      or if there are duplicate broker 
IDs in the replica lists
+     */
+    public static List<Uuid> update(int[] currentReplicas, DirectoryId[] 
currentDirectories, List<Integer> newReplicas) {

Review Comment:
   Can we have a better name than `update()` since this method is returning 
something new as opposed to updating something?  Maybe 
`createDirectoriesFrom()` or similar?



##########
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##########
@@ -49,12 +51,15 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.when;
 
 
 @Timeout(value = 40)
 public class PartitionChangeBuilderTest {
     private static Stream<Arguments> partitionChangeRecordVersions() {
-        return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> 
Arguments.of((short) version));
+        return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1)
+                .filter(v -> v != 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION) // TODO test latest record 
version in KAFKA-15514

Review Comment:
   I believe we are going to close the current PR associated with KAFKA-15514, 
https://github.com/apache/kafka/pull/14516, which I assume means we should 
remove this filter and make the tests pass in this PR.



##########
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##########
@@ -47,6 +47,8 @@
       "about": "The eligible leader replicas of this partition." },
     { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
       "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 2,
-      "about": "The last known eligible leader replicas of this partition." }
+      "about": "The last known eligible leader replicas of this partition." },
+    { "name": "Directories", "type": "[]uuid", "versions": "2+",
+      "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."}

Review Comment:
   Not sure, but wondering if this should be a tagged field like the ELR 
fields.  I don't see any specific advantage to doing so -- we won't ever use 
this version unless all nodes in the cluster support it, in which case making 
it mandatory actually makes sense -- but figured I would mention it in case 
anybody had additional thoughts or context around the ELR choice to tag the new 
fields in version 1.



##########
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##########
@@ -122,8 +132,13 @@ public PartitionRegistration build() {
                 throw new IllegalStateException("You must set last known 
elr.");
             }
 
+            if (directories == null) {
+                directories = DirectoryId.unassignedArray(replicas.length);
+            }

Review Comment:
   This makes sense to me -- set them as unassigned when they aren't specified. 
 Note that this could be the case due to the feature not being enabled.
   
   However, I suspect we should we do the same thing in `public 
PartitionRegistration(PartitionRecord record)`, otherwise sometimes we can have 
`PartitionRegistration` records with non-null but "unassigned" directory IDs 
and sometimes we can have null.  It feels to me that  it would be easier to 
reason about if we always have something there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to