cmccabe commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r991446808


##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -65,24 +64,62 @@ private Optional<Short> finalizedVersion(String feature) {
         return Optional.ofNullable(finalizedVersions.get(feature));
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
-        if 
(!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
-            // Write out the metadata.version record first, and then the rest 
of the finalized features
-            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName(MetadataVersion.FEATURE_NAME).
-                    setFeatureLevel(metadataVersion.featureLevel()), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        if 
(options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION))
 {
+            writePreProductionVersion(writer, options);
+        } else {
+            writeProductionVersion(writer, options);
         }
+    }
+
+    private void writePreProductionVersion(ImageWriter writer, 
ImageWriterOptions options) {
+        // If the metadata version is older than 3.3-IV0, we can't represent 
any feature flags,
+        // because the FeatureLevel record is not supported.
+        if (!finalizedVersions.isEmpty()) {
+            List<String> features = new 
ArrayList<>(finalizedVersions.keySet());
+            features.sort(String::compareTo);
+            options.handleLoss("feature flag(s): " +
+                    features.stream().collect(Collectors.joining(", ")));
+        }
+        // With this old metadata version, we can't even represent the 
metadata version itself.
+        // However, we do want to put something into the snapshot that 
indicates the version,
+        // for human consumption. The following two records set a topic config 
on __cluster_metadata
+        // and then clear it, in order to accomplish that. (ConfigRecord goes 
back to the earliest
+        // stable KRaft releases.)
+        //
+        // Another reason we do this here is that it allows us to maintain the 
invariant that every
+        // MetadataImage generates at least one record when serialized -- even 
if the image is
+        // empty.
+        writer.write(0, new ConfigRecord().
+            setResourceType(ResourceType.TOPIC.code()).
+            setResourceName("__cluster_metadata").
+            setName("metadata.version").
+            setValue(options.metadataVersion().toString()));
+        writer.write(0, new ConfigRecord().
+            setResourceType(ResourceType.TOPIC.code()).
+            setResourceName("__cluster_metadata").
+            setName("metadata.version").
+            setValue(null));
+    }
 
+    private void writeProductionVersion(ImageWriter writer, ImageWriterOptions 
options) {
+        // It is important to write out the metadata.version record first, 
because it may have an
+        // impact on how we decode records that come after it.
+        //
+        // Note: it's important that this initial FeatureLevelRecord be 
written with version 0 and
+        // not any later version, so that any modern reader can process it.

Review Comment:
   > Interesting. How about writing a similar comment in 
FeatureLevelRecord.json stating that 0 should always be a valid version?
   
   There is such a comment in `FeatureLevelRecord.json` already :)
   
   > Actually, maybe this is true for all metadata records. Zero must always be 
a valid version for all metadata records.
   
   I don't think zero necessarily needs to be valid for all metadata records. 
We can drop support for old metadata records over time. That would imply that 
old metadata images would become unreadable, though, so probably not something 
to do lightly.
   
   Anyway the comment here is not about 0 being supported, but about 0 being 
the only version we use, at all times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to