This is an automated email from the ASF dual-hosted git repository.

rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f385ef468b5 KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs 
(#14998)
f385ef468b5 is described below

commit f385ef468b57b02b064451306ebed3c11e37889d
Author: Igor Soarez <soa...@apple.com>
AuthorDate: Tue Dec 19 03:43:28 2023 +0700

    KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#14998)
    
    Any directory changes must be considered when replaying
    BrokerRegistrationChangeRecord. This is necessary
    to persist directory failures in the cluster metadata,
    which #14902 missed.
    
    Reviewers: Omnia G.H Ibrahim <o.g.h.ibra...@gmail.com>, Viktor Somogyi-Vass 
<viktorsomo...@gmail.com>
---
 .../apache/kafka/controller/ClusterControlManager.java   | 16 +++++++++++-----
 .../kafka/controller/ReplicationControlManager.java      | 10 +++++++++-
 .../main/java/org/apache/kafka/image/ClusterDelta.java   |  8 +++++++-
 .../org/apache/kafka/metadata/BrokerRegistration.java    |  8 +++++---
 .../kafka/controller/ReplicationControlManagerTest.java  |  2 ++
 5 files changed, 34 insertions(+), 10 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 528bf2ae05c..98d64c54835 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -534,7 +534,8 @@ public class ClusterControlManager {
             record.id(),
             record.epoch(),
             BrokerRegistrationFencingChange.FENCE.asBoolean(),
-            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
+            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+            Optional.empty()
         );
     }
 
@@ -544,7 +545,8 @@ public class ClusterControlManager {
             record.id(),
             record.epoch(),
             BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
-            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
+            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+            Optional.empty()
         );
     }
 
@@ -557,12 +559,14 @@ public class ClusterControlManager {
             
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %x", record, 
record.inControlledShutdown())));
+        Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
         replayRegistrationChange(
             record,
             record.brokerId(),
             record.brokerEpoch(),
             fencingChange.asBoolean(),
-            inControlledShutdownChange.asBoolean()
+            inControlledShutdownChange.asBoolean(),
+            directoriesChange
         );
     }
 
@@ -571,7 +575,8 @@ public class ClusterControlManager {
         int brokerId,
         long brokerEpoch,
         Optional<Boolean> fencingChange,
-        Optional<Boolean> inControlledShutdownChange
+        Optional<Boolean> inControlledShutdownChange,
+        Optional<List<Uuid>> directoriesChange
     ) {
         BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
         if (curRegistration == null) {
@@ -583,7 +588,8 @@ public class ClusterControlManager {
         } else {
             BrokerRegistration nextRegistration = curRegistration.cloneWith(
                 fencingChange,
-                inControlledShutdownChange
+                inControlledShutdownChange,
+                directoriesChange
             );
             if (!curRegistration.equals(nextRegistration)) {
                 log.info("Replayed {} modifying the registration for broker 
{}: {}",
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1dd5e2ba162..759e3dfe5c4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1401,10 +1401,13 @@ public class ReplicationControlManager {
                         "handleDirectoriesOffline[" + brokerId + ":" + 
newOfflineDir + "]",
                         brokerId, NO_LEADER, records, iterator);
             }
+            List<Uuid> newOnlineDirs = 
registration.directoryDifference(offlineDirs);
             records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
                     setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
-                    setLogDirs(registration.directoryDifference(offlineDirs)),
+                    setLogDirs(newOnlineDirs),
                     (short) 2));
+            log.warn("Directories {} in broker {} marked offline, remaining 
directories: {}",
+                    newOfflineDirs, brokerId, newOnlineDirs);
         }
     }
 
@@ -2126,6 +2129,11 @@ public class ReplicationControlManager {
                             if (directoryIsOffline) {
                                 leaderAndIsrUpdates.add(new 
TopicIdPartition(topicId, partitionIndex));
                             }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Broker {} assigned partition {}:{} 
to {} dir {}",
+                                    brokerId, topics.get(topicId).name(), 
partitionIndex,
+                                    directoryIsOffline ? "OFFLINE" : "ONLINE", 
dirId);
+                            }
                         }
                     }
                     resTopic.partitions().add(new 
AssignReplicasToDirsResponseData.PartitionData().
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index d87f8d898a7..caf09243ff4 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -30,6 +31,7 @@ import org.apache.kafka.metadata.ControllerRegistration;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -112,6 +114,7 @@ public final class ClusterDelta {
         BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), 
record.epoch(), "fence");
         changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
             BrokerRegistrationFencingChange.FENCE.asBoolean(),
+            Optional.empty(),
             Optional.empty()
         )));
     }
@@ -120,6 +123,7 @@ public final class ClusterDelta {
         BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), 
record.epoch(), "unfence");
         changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
             BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
+            Optional.empty(),
             Optional.empty()
         )));
     }
@@ -135,9 +139,11 @@ public final class ClusterDelta {
             
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %d", record, 
record.inControlledShutdown())));
+        Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
         BrokerRegistration nextRegistration = curRegistration.cloneWith(
             fencingChange.asBoolean(),
-            inControlledShutdownChange.asBoolean()
+            inControlledShutdownChange.asBoolean(),
+            directoriesChange
         );
         if (!curRegistration.equals(nextRegistration)) {
             changedBrokers.put(record.brokerId(), 
Optional.of(nextRegistration));
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 6c8b0c5d608..a003ed670bd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -390,12 +390,14 @@ public class BrokerRegistration {
 
     public BrokerRegistration cloneWith(
         Optional<Boolean> fencingChange,
-        Optional<Boolean> inControlledShutdownChange
+        Optional<Boolean> inControlledShutdownChange,
+        Optional<List<Uuid>> directoriesChange
     ) {
         boolean newFenced = fencingChange.orElse(fenced);
         boolean newInControlledShutdownChange = 
inControlledShutdownChange.orElse(inControlledShutdown);
+        List<Uuid> newDirectories = directoriesChange.orElse(directories);
 
-        if (newFenced == fenced && newInControlledShutdownChange == 
inControlledShutdown)
+        if (newFenced == fenced && newInControlledShutdownChange == 
inControlledShutdown && newDirectories.equals(directories))
             return this;
 
         return new BrokerRegistration(
@@ -408,7 +410,7 @@ public class BrokerRegistration {
             newFenced,
             newInControlledShutdownChange,
             isMigratingZkBroker,
-            directories
+            newDirectories
         );
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index c41e629fbc7..6ac769043e2 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -2979,6 +2979,8 @@ public class ReplicationControlManagerTest {
             sortPartitionChangeRecords(filter(records, 
PartitionChangeRecord.class))
         );
         assertEquals(3, records.size());
+        ctx.replay(records);
+        assertEquals(Collections.singletonList(dir2b1), 
ctx.clusterControl.registration(b1).directories());
     }
 
     /**

Reply via email to