cmccabe commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r991446808
##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -65,24 +64,62 @@ private Optional<Short> finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature));
}
- public void write(Consumer<List<ApiMessageAndVersion>> out) {
- List<ApiMessageAndVersion> batch = new ArrayList<>();
- if
(!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
- // Write out the metadata.version record first, and then the rest
of the finalized features
- batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(metadataVersion.featureLevel()),
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+ public void write(ImageWriter writer, ImageWriterOptions options) {
+ if
(options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION))
{
+ writePreProductionVersion(writer, options);
+ } else {
+ writeProductionVersion(writer, options);
}
+ }
+
+ private void writePreProductionVersion(ImageWriter writer,
ImageWriterOptions options) {
+ // If the metadata version is older than 3.3-IV0, we can't represent
any feature flags,
+ // because the FeatureLevel record is not supported.
+ if (!finalizedVersions.isEmpty()) {
+ List<String> features = new
ArrayList<>(finalizedVersions.keySet());
+ features.sort(String::compareTo);
+ options.handleLoss("feature flag(s): " +
+ features.stream().collect(Collectors.joining(", ")));
+ }
+ // With this old metadata version, we can't even represent the
metadata version itself.
+ // However, we do want to put something into the snapshot that
indicates the version,
+ // for human consumption. The following two records set a topic config
on __cluster_metadata
+ // and then clear it, in order to accomplish that. (ConfigRecord goes
back to the earliest
+ // stable KRaft releases.)
+ //
+ // Another reason we do this here is that it allows us to maintain the
invariant that every
+ // MetadataImage generates at least one record when serialized -- even
if the image is
+ // empty.
+ writer.write(0, new ConfigRecord().
+ setResourceType(ResourceType.TOPIC.code()).
+ setResourceName("__cluster_metadata").
+ setName("metadata.version").
+ setValue(options.metadataVersion().toString()));
+ writer.write(0, new ConfigRecord().
+ setResourceType(ResourceType.TOPIC.code()).
+ setResourceName("__cluster_metadata").
+ setName("metadata.version").
+ setValue(null));
+ }
+ private void writeProductionVersion(ImageWriter writer, ImageWriterOptions
options) {
+ // It is important to write out the metadata.version record first,
because it may have an
+ // impact on how we decode records that come after it.
+ //
+ // Note: it's important that this initial FeatureLevelRecord be
written with version 0 and
+ // not any later version, so that any modern reader can process it.
Review Comment:
> Interesting. How about writing a similar comment in
FeatureLevelRecord.json stating that 0 should always be a valid version?
There is such a comment in `FeatureLevelRecord.json` already :)
> Actually, maybe this is true for all metadata records. Zero must always be
a valid version for all metadata records.
I don't think zero necessarily needs to be valid for all metadata records.
We can drop support for old metadata records over time. That would imply that
old metadata images would become unreadable, though, so probably not something
to do lightly.
Anyway the comment here is not about 0 being supported, but about 0 being
the only version we use, at all times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]