hachikuji commented on code in PR #12513: URL: https://github.com/apache/kafka/pull/12513#discussion_r949590579
########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.metadata.util.BatchFileReader; +import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; +import org.apache.kafka.metadata.util.BatchFileWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapDirectory { + final static String INTER_BROKER_PROTOCOL_CONFIG_KEY = "inter.broker.protocol.version"; + final static String BINARY_BOOTSTRAP = "bootstrap.checkpoint"; + + public static String ibpStringFromConfigMap(Map<String, Object> staticConfig) { + Object value = staticConfig.get(INTER_BROKER_PROTOCOL_CONFIG_KEY); + return value == null ? "" : value.toString(); + } + + private final String directoryPath; + private final String ibp; + + /** + * Create a new BootstrapDirectory object. + * + * @param directoryPath The path to the directory with the bootstrap file. + * @param ibp The configured value of inter.broker.protocol, or the empty string Review Comment: Out of curiosity, why not use null as the sentinel? Seems less likely to be used mistakenly. ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.metadata.util.BatchFileReader; +import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; +import org.apache.kafka.metadata.util.BatchFileWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapDirectory { + final static String INTER_BROKER_PROTOCOL_CONFIG_KEY = "inter.broker.protocol.version"; + final static String BINARY_BOOTSTRAP = "bootstrap.checkpoint"; + + public static String ibpStringFromConfigMap(Map<String, Object> staticConfig) { + Object value = staticConfig.get(INTER_BROKER_PROTOCOL_CONFIG_KEY); + return value == null ? "" : value.toString(); + } + + private final String directoryPath; + private final String ibp; + + /** + * Create a new BootstrapDirectory object. + * + * @param directoryPath The path to the directory with the bootstrap file. + * @param ibp The configured value of inter.broker.protocol, or the empty string + * if it is not configured. + */ + public BootstrapDirectory( + String directoryPath, + String ibp + ) { + Objects.requireNonNull(directoryPath); + Objects.requireNonNull(ibp); + this.directoryPath = directoryPath; + this.ibp = ibp; + } + + public BootstrapMetadata read() throws Exception { + if (!Files.isDirectory(Paths.get(directoryPath))) { + throw new RuntimeException("No such directory as " + directoryPath); Review Comment: nit: perhaps it's useful to check existence separately so that we can have a clear message when either the file does not exist or it is not a directory. This might also be a useful helper to add to `Utils` or some such place. ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + + +/** + * The bootstrap metadata. On startup, if the metadata log is empty, we will populate the log with + * these records. Alternately, if log is not empty, but the metadata version is not set, we will + * use the version specified here. + */ +public class BootstrapMetadata { + private final List<ApiMessageAndVersion> records; + private final MetadataVersion metadataVersion; + private final String source; + + public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) { + List<ApiMessageAndVersion> records = Collections.singletonList( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) { + MetadataVersion metadataVersion = null; + for (ApiMessageAndVersion record : records) { + Optional<MetadataVersion> version = recordToMetadataVersion(record.message()); + if (version.isPresent()) { + metadataVersion = version.get(); + } + } + if (metadataVersion == null) { + throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME + + " was found in the bootstrap metadata from " + source); + } + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) { + if (record instanceof FeatureLevelRecord) { + FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; + if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) { + return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel())); + } + } + return Optional.empty(); + } + + BootstrapMetadata( + List<ApiMessageAndVersion> records, + MetadataVersion metadataVersion, + String source + ) { + Objects.requireNonNull(records); Review Comment: nit: usual idiom is this: ```java this.records = Objects.requireNonNull(records); ``` Also, is our requirement stricter? Do we need a non-empty list? Otherwise, `copyWithOnlyVersion` will fail below. Finally, do we need to check the consistency requirement between `metadataVersion` and the `FeatureLevelRecord` in `records`? Perhaps justifiable to skip extra validations if we can make the constructor private. ########## metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.util; + +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.MetadataRecordSerde; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + + +/** + * Reads a log file containing KRaft record batches. + */ +public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndType>, AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class); + + public static class Builder { + private String path = null; + + public Builder setPath(String path) { + this.path = path; + return this; + } + + public BatchFileReader build() throws Exception { + if (path == null) { + throw new RuntimeException("You must specify a path."); + } + FileRecords fileRecords = FileRecords.open(new File(path), false); + try { + return new BatchFileReader(fileRecords); + } catch (Throwable e) { + Utils.closeQuietly(fileRecords, "fileRecords"); + throw e; + } + } + } + + public static class BatchAndType { + private final Batch<ApiMessageAndVersion> batch; + private final boolean isControl; + + public BatchAndType(Batch<ApiMessageAndVersion> batch, boolean isControl) { + this.batch = batch; + this.isControl = isControl; + } + + public Batch<ApiMessageAndVersion> batch() { + return batch; + } + + public boolean isControl() { + return isControl; + } + } + + private final FileRecords fileRecords; + private Iterator<FileChannelRecordBatch> batchIterator; + private final MetadataRecordSerde serde; + + private BatchFileReader(FileRecords fileRecords) { + this.fileRecords = fileRecords; + this.batchIterator = fileRecords.batchIterator(); + this.serde = new MetadataRecordSerde(); + } + + @Override + public boolean hasNext() { + return this.batchIterator.hasNext(); + } + + @Override + public BatchAndType next() { + FileChannelRecordBatch input = batchIterator.next(); + if (input.isControlBatch()) { + return nextControlBatch(input); + } else { + return nextMetadataBatch(input); + } + } + + private BatchAndType nextControlBatch(FileChannelRecordBatch input) { + List<ApiMessageAndVersion> messages = new ArrayList<>(); + for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) { + Record record = iter.next(); + try { + short typeId = ControlRecordType.parseTypeId(record.key()); + ControlRecordType type = ControlRecordType.fromTypeId(typeId); + switch (type) { + case LEADER_CHANGE: { + LeaderChangeMessage message = new LeaderChangeMessage(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + case SNAPSHOT_HEADER: { + SnapshotHeaderRecord message = new SnapshotHeaderRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + case SNAPSHOT_FOOTER: { + SnapshotFooterRecord message = new SnapshotFooterRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + default: + throw new RuntimeException("Unsupported control record type " + type + " at offset " + + record.offset()); + } + } catch (Throwable e) { + throw new RuntimeException("Unable to read control record at offset " + record.offset(), e); + } + } + return new BatchAndType(Batch.data( + input.baseOffset(), + input.partitionLeaderEpoch(), + input.maxTimestamp(), + input.sizeInBytes(), + messages), true); + } + + private BatchAndType nextMetadataBatch(FileChannelRecordBatch input) { + List<ApiMessageAndVersion> messages = new ArrayList<>(); + for (Record record : input) { + ByteBufferAccessor accessor = new ByteBufferAccessor(record.value()); + try { + ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize()); + messages.add(messageAndVersion); + } catch (Throwable e) { + log.error("unable to read metadata record at offset {}", record.offset(), e); Review Comment: Hmm, why don't we want to propagate this? Seems dangerous to continue. ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1142,15 +1142,19 @@ private void renounce() { purgatory.failAll(newNotControllerException()); if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) { - newBytesSinceLastSnapshot = 0; snapshotRegistry.revertToSnapshot(lastCommittedOffset); authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl())); } else { - resetState(); + log.warn("Unable to find last committed offset {} in snapshot registry; resetting " + Review Comment: Is not having a snapshot for the committed offset an expected or unexpected state? ########## metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java: ########## @@ -222,52 +227,130 @@ public void testFeatureControlIterator() throws Exception { manager.iterator(Long.MAX_VALUE)); } + private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 = + new FeatureControlManager.Builder(). + setQuorumFeatures(features(MetadataVersion.FEATURE_NAME, + MetadataVersion.IBP_3_3_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())). + setMetadataVersion(MetadataVersion.IBP_3_3_IV2); + @Test public void testApplyMetadataVersionChangeRecord() { - QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, - MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel()); - FeatureControlManager manager = new FeatureControlManager.Builder(). - setQuorumFeatures(features).build(); + FeatureControlManager manager = TEST_MANAGER_BUILDER1.build(); manager.replay(new FeatureLevelRecord(). setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel())); - assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion()); + setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel())); + assertEquals(MetadataVersion.IBP_3_3_IV3, manager.metadataVersion()); } @Test - public void testDowngradeMetadataVersion() { - QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, - MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel()); - FeatureControlManager manager = new FeatureControlManager.Builder(). - setQuorumFeatures(features). - setMetadataVersion(MetadataVersion.IBP_3_3_IV0). - build(); - assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0); + public void testCannotDowngradeToVersionBeforeMinimumSupportedKraftVersion() { + FeatureControlManager manager = TEST_MANAGER_BUILDER1.build(); + assertEquals(ControllerResult.of(Collections.emptyList(), + singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION, + "Invalid update version 3 for feature metadata.version. Local controller 0 only " + + "supports versions 4-7"))), + manager.updateFeatures( + singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()), + singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), + emptyMap(), + true)); + } + + @Test + public void testCannotDowngradeToHigherVersion() { + FeatureControlManager manager = TEST_MANAGER_BUILDER1.build(); + assertEquals(ControllerResult.of(Collections.emptyList(), + singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION, + "Invalid update version 7 for feature metadata.version. Can't downgrade to a " + + "newer version."))), + manager.updateFeatures( + singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()), + singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE), Review Comment: Is the downgrade feature supported yet? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.metadata.util.BatchFileReader; +import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; +import org.apache.kafka.metadata.util.BatchFileWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapDirectory { + final static String INTER_BROKER_PROTOCOL_CONFIG_KEY = "inter.broker.protocol.version"; + final static String BINARY_BOOTSTRAP = "bootstrap.checkpoint"; + + public static String ibpStringFromConfigMap(Map<String, Object> staticConfig) { + Object value = staticConfig.get(INTER_BROKER_PROTOCOL_CONFIG_KEY); + return value == null ? "" : value.toString(); + } + + private final String directoryPath; + private final String ibp; + + /** + * Create a new BootstrapDirectory object. + * + * @param directoryPath The path to the directory with the bootstrap file. + * @param ibp The configured value of inter.broker.protocol, or the empty string + * if it is not configured. + */ + public BootstrapDirectory( + String directoryPath, + String ibp + ) { + Objects.requireNonNull(directoryPath); + Objects.requireNonNull(ibp); + this.directoryPath = directoryPath; + this.ibp = ibp; + } + + public BootstrapMetadata read() throws Exception { + if (!Files.isDirectory(Paths.get(directoryPath))) { + throw new RuntimeException("No such directory as " + directoryPath); + } + Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP); + if (!Files.exists(binaryBootstrapPath)) { + return readFromConfiguration(); + } else { + return readFromBinaryFile(binaryBootstrapPath.toString()); + } + } + + BootstrapMetadata readFromConfiguration() { + if (ibp.isEmpty()) { + return BootstrapMetadata.fromVersion(MetadataVersion.latest(), + "the default bootstrap file which sets the latest metadata.version, " + + "since no bootstrap file was found, and " + INTER_BROKER_PROTOCOL_CONFIG_KEY + + " was not configured."); + } + MetadataVersion version = MetadataVersion.fromVersionString(ibp); + if (version.isLessThan(MINIMUM_KRAFT_VERSION)) { + return BootstrapMetadata.fromVersion(MINIMUM_KRAFT_VERSION, + "a default bootstrap file setting the minimum supported KRaft metadata " + Review Comment: The source descriptions in here are pretty long-winded. I am not so sure it remains coherent when used in a case like the following: ```java throw new RuntimeException("Unable to load metadata from " + source + ": bootstrap " + "metadata versions less than " + MetadataVersion.MINIMUM_KRAFT_VERSION + " are " + "not supported."); ``` Perhaps `source` should be concise and we can save the verbose message for logging? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + + +/** + * The bootstrap metadata. On startup, if the metadata log is empty, we will populate the log with + * these records. Alternately, if log is not empty, but the metadata version is not set, we will + * use the version specified here. + */ +public class BootstrapMetadata { + private final List<ApiMessageAndVersion> records; + private final MetadataVersion metadataVersion; + private final String source; + + public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) { + List<ApiMessageAndVersion> records = Collections.singletonList( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) { + MetadataVersion metadataVersion = null; + for (ApiMessageAndVersion record : records) { + Optional<MetadataVersion> version = recordToMetadataVersion(record.message()); + if (version.isPresent()) { + metadataVersion = version.get(); + } + } + if (metadataVersion == null) { + throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME + + " was found in the bootstrap metadata from " + source); + } + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) { + if (record instanceof FeatureLevelRecord) { + FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; + if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) { + return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel())); + } + } + return Optional.empty(); + } + + BootstrapMetadata( + List<ApiMessageAndVersion> records, + MetadataVersion metadataVersion, + String source + ) { + Objects.requireNonNull(records); + this.records = records; + if (metadataVersion.isLessThan(MetadataVersion.MINIMUM_KRAFT_VERSION)) { + throw new RuntimeException("Unable to load metadata from " + source + ": bootstrap " + + "metadata versions less than " + MetadataVersion.MINIMUM_KRAFT_VERSION + " are " + + "not supported."); + } + this.metadataVersion = metadataVersion; + Objects.requireNonNull(source); + this.source = source; + } + + public List<ApiMessageAndVersion> records() { + return records; + } + + public MetadataVersion metadataVersion() { + return metadataVersion; + } + + public String source() { + return source; + } + + public BootstrapMetadata copyWithOnlyVersion() { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + for (ApiMessageAndVersion record : records) { + if (recordToMetadataVersion(record.message()).isPresent()) { + newRecords.clear(); Review Comment: Not sure why we need a list if we clear every time before we add a record. Could it just be a variable for a single instance of `ApiMessageAndVersion`? ########## metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.util; + +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.MetadataRecordSerde; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + + +/** + * Reads a log file containing KRaft record batches. + */ +public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndType>, AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class); + + public static class Builder { + private String path = null; + + public Builder setPath(String path) { + this.path = path; Review Comment: nit: perhaps we could require the path be non-null? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + + +/** + * The bootstrap metadata. On startup, if the metadata log is empty, we will populate the log with + * these records. Alternately, if log is not empty, but the metadata version is not set, we will + * use the version specified here. + */ +public class BootstrapMetadata { + private final List<ApiMessageAndVersion> records; + private final MetadataVersion metadataVersion; + private final String source; + + public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) { + List<ApiMessageAndVersion> records = Collections.singletonList( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); Review Comment: Just wondering about the use of 0 here as the version. Is this a requirement for all bootstrap snapshots in order to maintain compatibility? ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -188,12 +182,16 @@ private ApiError updateFeature( } } - if (currentVersion != null && newVersion < currentVersion) { + if (newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { Review Comment: Not from this patch, but I was trying to trace back where the `UpgradeType` is determined. I see logic in `QuorumController.updateFeatures` to convert from the code in the request, but it doesn't look like we are checking for the `UNKNOWN` type. Am I missing the validation somewhere? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.metadata.util.BatchFileReader; +import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; +import org.apache.kafka.metadata.util.BatchFileWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the Review Comment: The comment mentions `bootstrap.snapshot`, but below we seem to use `bootstrap.checkpoint`. Which is right? ########## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ########## @@ -952,6 +966,7 @@ public void testEarlyControllerResults() throws Throwable { } } + @Disabled // TODO: need to fix leader election in LocalLog. Review Comment: Did this break from this patch or is it flaky? ########## metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java: ########## @@ -222,52 +227,130 @@ public void testFeatureControlIterator() throws Exception { manager.iterator(Long.MAX_VALUE)); } + private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 = + new FeatureControlManager.Builder(). + setQuorumFeatures(features(MetadataVersion.FEATURE_NAME, + MetadataVersion.IBP_3_3_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())). + setMetadataVersion(MetadataVersion.IBP_3_3_IV2); + @Test public void testApplyMetadataVersionChangeRecord() { - QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, - MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel()); - FeatureControlManager manager = new FeatureControlManager.Builder(). - setQuorumFeatures(features).build(); + FeatureControlManager manager = TEST_MANAGER_BUILDER1.build(); manager.replay(new FeatureLevelRecord(). setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel())); - assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion()); + setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel())); + assertEquals(MetadataVersion.IBP_3_3_IV3, manager.metadataVersion()); } @Test - public void testDowngradeMetadataVersion() { - QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, - MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel()); - FeatureControlManager manager = new FeatureControlManager.Builder(). - setQuorumFeatures(features). - setMetadataVersion(MetadataVersion.IBP_3_3_IV0). - build(); - assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0); + public void testCannotDowngradeToVersionBeforeMinimumSupportedKraftVersion() { + FeatureControlManager manager = TEST_MANAGER_BUILDER1.build(); + assertEquals(ControllerResult.of(Collections.emptyList(), + singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION, Review Comment: By the way, the overloading of `INVALID_UPDATE_VERSION` is a little unfortunate. The error makes sense here and seems reasonable, but we also use it for "partition epoch" mismatches in `AlterPartition`. I almost wonder if it would be more consistent with other APIs to use `INVALID_REQUEST` here. ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + + +/** + * The bootstrap metadata. On startup, if the metadata log is empty, we will populate the log with + * these records. Alternately, if log is not empty, but the metadata version is not set, we will + * use the version specified here. + */ +public class BootstrapMetadata { + private final List<ApiMessageAndVersion> records; + private final MetadataVersion metadataVersion; + private final String source; + + public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) { + List<ApiMessageAndVersion> records = Collections.singletonList( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) { + MetadataVersion metadataVersion = null; + for (ApiMessageAndVersion record : records) { + Optional<MetadataVersion> version = recordToMetadataVersion(record.message()); + if (version.isPresent()) { + metadataVersion = version.get(); + } + } + if (metadataVersion == null) { + throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME + + " was found in the bootstrap metadata from " + source); + } + return new BootstrapMetadata(records, metadataVersion, source); + } + + public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) { + if (record instanceof FeatureLevelRecord) { + FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; + if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) { + return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel())); + } + } + return Optional.empty(); + } + + BootstrapMetadata( + List<ApiMessageAndVersion> records, + MetadataVersion metadataVersion, + String source + ) { + Objects.requireNonNull(records); + this.records = records; + if (metadataVersion.isLessThan(MetadataVersion.MINIMUM_KRAFT_VERSION)) { + throw new RuntimeException("Unable to load metadata from " + source + ": bootstrap " + + "metadata versions less than " + MetadataVersion.MINIMUM_KRAFT_VERSION + " are " + + "not supported."); + } + this.metadataVersion = metadataVersion; + Objects.requireNonNull(source); + this.source = source; + } + + public List<ApiMessageAndVersion> records() { + return records; + } + + public MetadataVersion metadataVersion() { + return metadataVersion; + } + + public String source() { + return source; + } + + public BootstrapMetadata copyWithOnlyVersion() { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + for (ApiMessageAndVersion record : records) { + if (recordToMetadataVersion(record.message()).isPresent()) { + newRecords.clear(); + newRecords.add(record); + } + } + if (newRecords.isEmpty()) { + throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME + + " was found in the bootstrap metadata from " + source); + } + return new BootstrapMetadata(Collections.unmodifiableList(newRecords), + metadataVersion, source); + } + + @Override + public int hashCode() { + return Objects.hash(records, metadataVersion, source); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof BootstrapMetadata)) return false; Review Comment: nit: the first check `o == null` is covered by the second check ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1118,6 +1059,65 @@ private void updateWriteOffset(long offset) { } } + private void claim(int epoch) { + try { + if (curClaimEpoch != -1) { + throw new RuntimeException("Cannot claim leadership because we are already the " + + "active controller."); + } + curClaimEpoch = epoch; + controllerMetrics.setActive(true); + updateWriteOffset(lastCommittedOffset); + clusterControl.activate(); + + // Before switching to active, create an in-memory snapshot at the last committed + // offset. This is required because the active controller assumes that there is always + // an in-memory snapshot at the last committed offset. + snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + // Prepend the activate event. It is important that this event go at the beginning + // of the queue rather than the end (hence prepend rather than append). It's also + // important not to use prepend for anything else, to preserve the ordering here. + queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", + new CompleteActivationEvent())); + } catch (Throwable e) { + fatalFaultHandler.handleFault("exception while claiming leadership", e); + } + } + + class CompleteActivationEvent implements ControllerWriteOperation<Void> { + @Override + public ControllerResult<Void> generateRecordsAndResult() throws Exception { + List<ApiMessageAndVersion> records = new ArrayList<>(); + if (logReplayTracker.empty()) { + // If no records have been replayed, we need to write out the bootstrap records. + // This will include the new metadata.version, as well as things like SCRAM + // initialization, etc. + log.warn("The metadata log appears to be empty. Appending {} bootstrap record(s) " + Review Comment: Are these messages really warnings or could they be info? ########## server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java: ########## @@ -72,20 +71,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class MetadataVersionTest { - @Test public void testFeatureLevel() { - MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS; - int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(MetadataVersion.MINIMUM_KRAFT_VERSION); - for (int i = 0; i < firstFeatureLevelIndex; i++) { - assertTrue(metadataVersions[i].featureLevel() < 0); + int i = 0; + while (i < MetadataVersion.VERSIONS.length && Review Comment: nit: The new version of this test seems more obscure than the original. It seems more straightforward to assert 1) all versions below IBP_3_0_IV1 have -1 feature level, and 2) that all subsequent version increment feature level by 1. Also, it would be helpful to have a better name for the test so that we don't have to decipher what it's trying to do. For example, perhaps `testSequentialFeatureLevel` or something like that. ########## metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.Optional; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + + +public class LogReplayTracker { + public static class Builder { + private LogContext logContext = null; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + public LogReplayTracker build() { + if (logContext == null) logContext = new LogContext(); + return new LogReplayTracker(logContext); + } + } + + /** + * The slf4j logger. + */ + private final Logger log; + + /** + * True if we haven't replayed any records yet. + */ + private boolean empty; + + /** + * True if we haven't replayed any metadata.version records yet. + */ + private boolean versionless; Review Comment: nit: the name confused me a little bit. How about `hasReadMetadataVersion` or something like that? Or perhaps even simpler, we could record the `MetadataVersion` itself. ########## metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.util; + +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.MetadataRecordSerde; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + + +/** + * Reads a log file containing KRaft record batches. + */ +public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndType>, AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class); + + public static class Builder { + private String path = null; + + public Builder setPath(String path) { + this.path = path; + return this; + } + + public BatchFileReader build() throws Exception { + if (path == null) { + throw new RuntimeException("You must specify a path."); + } + FileRecords fileRecords = FileRecords.open(new File(path), false); + try { + return new BatchFileReader(fileRecords); + } catch (Throwable e) { + Utils.closeQuietly(fileRecords, "fileRecords"); + throw e; + } + } + } + + public static class BatchAndType { + private final Batch<ApiMessageAndVersion> batch; + private final boolean isControl; + + public BatchAndType(Batch<ApiMessageAndVersion> batch, boolean isControl) { + this.batch = batch; + this.isControl = isControl; + } + + public Batch<ApiMessageAndVersion> batch() { + return batch; + } + + public boolean isControl() { + return isControl; + } + } + + private final FileRecords fileRecords; + private Iterator<FileChannelRecordBatch> batchIterator; + private final MetadataRecordSerde serde; + + private BatchFileReader(FileRecords fileRecords) { + this.fileRecords = fileRecords; + this.batchIterator = fileRecords.batchIterator(); + this.serde = new MetadataRecordSerde(); + } + + @Override + public boolean hasNext() { + return this.batchIterator.hasNext(); + } + + @Override + public BatchAndType next() { + FileChannelRecordBatch input = batchIterator.next(); + if (input.isControlBatch()) { + return nextControlBatch(input); + } else { + return nextMetadataBatch(input); + } + } + + private BatchAndType nextControlBatch(FileChannelRecordBatch input) { + List<ApiMessageAndVersion> messages = new ArrayList<>(); + for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) { + Record record = iter.next(); + try { + short typeId = ControlRecordType.parseTypeId(record.key()); + ControlRecordType type = ControlRecordType.fromTypeId(typeId); + switch (type) { + case LEADER_CHANGE: { + LeaderChangeMessage message = new LeaderChangeMessage(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + case SNAPSHOT_HEADER: { + SnapshotHeaderRecord message = new SnapshotHeaderRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + case SNAPSHOT_FOOTER: { + SnapshotFooterRecord message = new SnapshotFooterRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + default: + throw new RuntimeException("Unsupported control record type " + type + " at offset " + + record.offset()); + } + } catch (Throwable e) { + throw new RuntimeException("Unable to read control record at offset " + record.offset(), e); + } + } + return new BatchAndType(Batch.data( + input.baseOffset(), + input.partitionLeaderEpoch(), + input.maxTimestamp(), + input.sizeInBytes(), + messages), true); + } + + private BatchAndType nextMetadataBatch(FileChannelRecordBatch input) { + List<ApiMessageAndVersion> messages = new ArrayList<>(); + for (Record record : input) { + ByteBufferAccessor accessor = new ByteBufferAccessor(record.value()); + try { + ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize()); + messages.add(messageAndVersion); + } catch (Throwable e) { + log.error("unable to read metadata record at offset {}", record.offset(), e); + } + } + return new BatchAndType(Batch.data( + input.baseOffset(), + input.partitionLeaderEpoch(), + input.maxTimestamp(), + input.sizeInBytes(), + messages), false); + } + + @Override + public void close() { + try { + fileRecords.closeHandlers(); Review Comment: Why not use `close()` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
