This is an automated email from the ASF dual-hosted git repository. rndgstn pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f385ef468b5 KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#14998) f385ef468b5 is described below commit f385ef468b57b02b064451306ebed3c11e37889d Author: Igor Soarez <soa...@apple.com> AuthorDate: Tue Dec 19 03:43:28 2023 +0700 KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#14998) Any directory changes must be considered when replaying BrokerRegistrationChangeRecord. This is necessary to persist directory failures in the cluster metadata, which #14902 missed. Reviewers: Omnia G.H Ibrahim <o.g.h.ibra...@gmail.com>, Viktor Somogyi-Vass <viktorsomo...@gmail.com> --- .../apache/kafka/controller/ClusterControlManager.java | 16 +++++++++++----- .../kafka/controller/ReplicationControlManager.java | 10 +++++++++- .../main/java/org/apache/kafka/image/ClusterDelta.java | 8 +++++++- .../org/apache/kafka/metadata/BrokerRegistration.java | 8 +++++--- .../kafka/controller/ReplicationControlManagerTest.java | 2 ++ 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 528bf2ae05c..98d64c54835 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -534,7 +534,8 @@ public class ClusterControlManager { record.id(), record.epoch(), BrokerRegistrationFencingChange.FENCE.asBoolean(), - BrokerRegistrationInControlledShutdownChange.NONE.asBoolean() + BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(), + Optional.empty() ); } @@ -544,7 +545,8 @@ public class ClusterControlManager { record.id(), record.epoch(), BrokerRegistrationFencingChange.UNFENCE.asBoolean(), - BrokerRegistrationInControlledShutdownChange.NONE.asBoolean() + BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(), + Optional.empty() ); } @@ -557,12 +559,14 @@ public class ClusterControlManager { BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + "value for inControlledShutdown field: %x", record, record.inControlledShutdown()))); + Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty()); replayRegistrationChange( record, record.brokerId(), record.brokerEpoch(), fencingChange.asBoolean(), - inControlledShutdownChange.asBoolean() + inControlledShutdownChange.asBoolean(), + directoriesChange ); } @@ -571,7 +575,8 @@ public class ClusterControlManager { int brokerId, long brokerEpoch, Optional<Boolean> fencingChange, - Optional<Boolean> inControlledShutdownChange + Optional<Boolean> inControlledShutdownChange, + Optional<List<Uuid>> directoriesChange ) { BrokerRegistration curRegistration = brokerRegistrations.get(brokerId); if (curRegistration == null) { @@ -583,7 +588,8 @@ public class ClusterControlManager { } else { BrokerRegistration nextRegistration = curRegistration.cloneWith( fencingChange, - inControlledShutdownChange + inControlledShutdownChange, + directoriesChange ); if (!curRegistration.equals(nextRegistration)) { log.info("Replayed {} modifying the registration for broker {}: {}", diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 1dd5e2ba162..759e3dfe5c4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1401,10 +1401,13 @@ public class ReplicationControlManager { "handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]", brokerId, NO_LEADER, records, iterator); } + List<Uuid> newOnlineDirs = registration.directoryDifference(offlineDirs); records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). - setLogDirs(registration.directoryDifference(offlineDirs)), + setLogDirs(newOnlineDirs), (short) 2)); + log.warn("Directories {} in broker {} marked offline, remaining directories: {}", + newOfflineDirs, brokerId, newOnlineDirs); } } @@ -2126,6 +2129,11 @@ public class ReplicationControlManager { if (directoryIsOffline) { leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex)); } + if (log.isDebugEnabled()) { + log.debug("Broker {} assigned partition {}:{} to {} dir {}", + brokerId, topics.get(topicId).name(), partitionIndex, + directoryIsOffline ? "OFFLINE" : "ONLINE", dirId); + } } } resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData(). diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java index d87f8d898a7..caf09243ff4 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -30,6 +31,7 @@ import org.apache.kafka.metadata.ControllerRegistration; import org.apache.kafka.server.common.MetadataVersion; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -112,6 +114,7 @@ public final class ClusterDelta { BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "fence"); changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith( BrokerRegistrationFencingChange.FENCE.asBoolean(), + Optional.empty(), Optional.empty() ))); } @@ -120,6 +123,7 @@ public final class ClusterDelta { BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "unfence"); changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith( BrokerRegistrationFencingChange.UNFENCE.asBoolean(), + Optional.empty(), Optional.empty() ))); } @@ -135,9 +139,11 @@ public final class ClusterDelta { BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + "value for inControlledShutdown field: %d", record, record.inControlledShutdown()))); + Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty()); BrokerRegistration nextRegistration = curRegistration.cloneWith( fencingChange.asBoolean(), - inControlledShutdownChange.asBoolean() + inControlledShutdownChange.asBoolean(), + directoriesChange ); if (!curRegistration.equals(nextRegistration)) { changedBrokers.put(record.brokerId(), Optional.of(nextRegistration)); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index 6c8b0c5d608..a003ed670bd 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -390,12 +390,14 @@ public class BrokerRegistration { public BrokerRegistration cloneWith( Optional<Boolean> fencingChange, - Optional<Boolean> inControlledShutdownChange + Optional<Boolean> inControlledShutdownChange, + Optional<List<Uuid>> directoriesChange ) { boolean newFenced = fencingChange.orElse(fenced); boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown); + List<Uuid> newDirectories = directoriesChange.orElse(directories); - if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown) + if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown && newDirectories.equals(directories)) return this; return new BrokerRegistration( @@ -408,7 +410,7 @@ public class BrokerRegistration { newFenced, newInControlledShutdownChange, isMigratingZkBroker, - directories + newDirectories ); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index c41e629fbc7..6ac769043e2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -2979,6 +2979,8 @@ public class ReplicationControlManagerTest { sortPartitionChangeRecords(filter(records, PartitionChangeRecord.class)) ); assertEquals(3, records.size()); + ctx.replay(records); + assertEquals(Collections.singletonList(dir2b1), ctx.clusterControl.registration(b1).directories()); } /**