wuchong commented on code in PR #2594:
URL: https://github.com/apache/fluss/pull/2594#discussion_r3035545007


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -628,6 +661,102 @@ private long applyUpdate(
         }
     }
 
+    /** Batch-constant state for aligning rows to the latest schema. */
+    private static class SchemaAlignmentContext implements AutoCloseable {
+        final short latestSchemaId;
+        final List<Integer> targetColIds;
+        final RowEncoder encoder;
+        final Map<Short, SourceSchemaMapping> cache = new HashMap<>();
+
+        SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, 
KvFormat kvFormat) {
+            this.latestSchemaId = latestSchemaId;
+            this.targetColIds = latestSchema.getColumnIds();
+            this.encoder = RowEncoder.create(kvFormat, 
latestSchema.getRowType());
+        }
+
+        @Override
+        public void close() throws Exception {
+            encoder.close();
+        }
+
+        /** Cached field getters and column-id→position map for a single 
source schema. */
+        private static class SourceSchemaMapping {
+            final Map<Integer, Integer> idToPos;
+            final InternalRow.FieldGetter[] getters;
+
+            SourceSchemaMapping(Schema sourceSchema) {
+                List<Integer> sourceColIds = sourceSchema.getColumnIds();
+                this.idToPos = new HashMap<>();
+                for (int i = 0; i < sourceColIds.size(); i++) {
+                    idToPos.put(sourceColIds.get(i), i);
+                }
+                this.getters = 
InternalRow.createFieldGetters(sourceSchema.getRowType());
+            }
+        }
+    }
+
+    /**
+     * Converts a {@link BinaryValue} from its source schema layout to the 
latest schema layout
+     * using column IDs to map positions. New columns (present in latest but 
not in source) are
+     * filled with null. Only call when {@code value.schemaId != 
latestSchemaId}.
+     */
+    private BinaryValue alignToLatestSchema(BinaryValue value, 
SchemaAlignmentContext ctx) {
+        SchemaAlignmentContext.SourceSchemaMapping mapping =
+                ctx.cache.computeIfAbsent(
+                        value.schemaId,
+                        id ->
+                                new SchemaAlignmentContext.SourceSchemaMapping(
+                                        schemaGetter.getSchema(id)));
+
+        ctx.encoder.startNewRow();
+        for (int targetPos = 0; targetPos < ctx.targetColIds.size(); 
targetPos++) {
+            Integer sourcePos = 
mapping.idToPos.get(ctx.targetColIds.get(targetPos));
+            if (sourcePos == null) {
+                // Column added after the source schema — fill with null.
+                ctx.encoder.encodeField(targetPos, null);
+            } else {
+                ctx.encoder.encodeField(
+                        targetPos, 
mapping.getters[sourcePos].getFieldOrNull(value.row));
+            }
+        }
+        // copy() is required: the encoder reuses its internal buffer, so the 
next
+        // startNewRow() would overwrite the row returned here.
+        return new BinaryValue(ctx.latestSchemaId, 
ctx.encoder.finishRow().copy());

Review Comment:
   After schema evolution, every `oldValue` read from KV store with an older 
schema goes through `alignToLatestSchema`, which does a full field-by-field 
re-encode + `copy()`. The previous approach used `PaddingRow` as a zero-copy 
virtual view. This introduces a significant per-record overhead during the 
(potentially long) transition period when old-schema data still dominates the 
KV store. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -473,39 +488,50 @@ private void processKvRecords(
                 KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
         ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
 
-        for (KvRecord kvRecord : kvRecords.records(readContext)) {
-            byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
-            KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
-            BinaryRow row = kvRecord.getRow();
-            BinaryValue currentValue = row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
+        try (SchemaAlignmentContext alignmentContext =
+                new SchemaAlignmentContext(latestSchemaId, latestSchema, 
kvFormat)) {

Review Comment:
   `SchemaAlignmentContext` is created unconditionally on every 
`processKvRecords` call, even when `schemaIdOfNewData == latestSchemaId` (the 
common case — no schema evolution). This allocates a `RowEncoder` + `HashMap` 
on the hot path for every batch. Which will introduce performance regression. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to