polyzos commented on code in PR #2594:
URL: https://github.com/apache/fluss/pull/2594#discussion_r3015786564


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -582,6 +618,74 @@ private long applyUpdate(
         }
     }
 
+    /**
+     * Converts a {@link BinaryValue} from its source schema layout to the 
latest schema layout
+     * using column IDs to map positions. New columns (present in latest but 
not in source) are
+     * filled with null. This only runs when schemas differ; the common case 
short-circuits.
+     */
+    private BinaryValue alignToLatestSchema(
+            BinaryValue value, short latestSchemaId, Schema latestSchema) {
+        if (value.schemaId == latestSchemaId) {
+            return value;
+        }
+
+        Schema sourceSchema = schemaGetter.getSchema(value.schemaId);
+        List<Integer> sourceColIds = sourceSchema.getColumnIds();
+        List<Integer> targetColIds = latestSchema.getColumnIds();
+
+        Map<Integer, Integer> sourceIdToPos = new HashMap<>();
+        for (int i = 0; i < sourceColIds.size(); i++) {
+            sourceIdToPos.put(sourceColIds.get(i), i);
+        }
+
+        InternalRow.FieldGetter[] sourceGetters =
+                InternalRow.createFieldGetters(sourceSchema.getRowType());
+        RowEncoder encoder = RowEncoder.create(kvFormat, 
latestSchema.getRowType());

Review Comment:
   RowEncoder implements AutoCloseable but is never closed. Wrap in 
try-with-resources to avoid a resource leak on every aligned record.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -582,6 +618,74 @@ private long applyUpdate(
         }
     }
 
+    /**
+     * Converts a {@link BinaryValue} from its source schema layout to the 
latest schema layout
+     * using column IDs to map positions. New columns (present in latest but 
not in source) are
+     * filled with null. This only runs when schemas differ; the common case 
short-circuits.
+     */
+    private BinaryValue alignToLatestSchema(
+            BinaryValue value, short latestSchemaId, Schema latestSchema) {
+        if (value.schemaId == latestSchemaId) {
+            return value;
+        }
+
+        Schema sourceSchema = schemaGetter.getSchema(value.schemaId);
+        List<Integer> sourceColIds = sourceSchema.getColumnIds();
+        List<Integer> targetColIds = latestSchema.getColumnIds();
+
+        Map<Integer, Integer> sourceIdToPos = new HashMap<>();
+        for (int i = 0; i < sourceColIds.size(); i++) {
+            sourceIdToPos.put(sourceColIds.get(i), i);
+        }
+
+        InternalRow.FieldGetter[] sourceGetters =

Review Comment:
   createFieldGetters and RowEncoder.create are called once per record, but for 
a batch, they are constant across all records sharing the same source schema. 
Maybe we can consider constructing them once per batch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to