rahil-c commented on code in PR #17731:
URL: https://github.com/apache/hudi/pull/17731#discussion_r2654326983


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java:
##########
@@ -108,7 +108,8 @@ public ClosableIterator<HoodieRecord<InternalRow>> 
getRecordIterator(HoodieSchem
   @Override
   public ClosableIterator<HoodieRecord<InternalRow>> 
getRecordIterator(HoodieSchema schema) throws IOException {
     ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(schema);
-    return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new 
HoodieSparkRecord(data)));
+    //TODO .copy() is needed for correctness, to investigate further in future.

Review Comment:
   Currently in our code we leverage the following `UnsafeProjection` when 
converting an InternalRow to an `UnsafeRow` 
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java#L132.
   
   I went thru the spark docs to see if find more findings on UnsafeProjection, 
but did not see any docs for this so tried examining in spark repo the 
following code classes to get more insights around the behavior.
    
   <img width="1036" height="617" alt="Screenshot 2025-12-30 at 6 15 22 PM" 
src="https://github.com/user-attachments/assets/137c4eba-2433-44a8-9d7f-e2cc02b93abc";
 />
   
   When checking the following class, I can see that there is the mention of a 
shared buffer  
   
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
   
   with a recommendation for the following: 
   > This class reuses the [[UnsafeRow]] it produces, a consumer should copy 
the row if it is being buffered
   
   
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
   ```
   * It generates the code for all the expressions, computes the total length 
for all the columns
    * (can be accessed via variables), and then copies the data into a scratch 
buffer space in the
    * form of UnsafeRow (the scratch buffer will grow as needed).
    *
    * @note The returned UnsafeRow will be pointed to a scratch buffer inside 
the projection.
   ```
   
   Based on what you mentioned above
   > If it uses some shared buffer, then you need to copy.
   
   If we are leveraging copy, im thinking then in the  `LanceRecordIterator` in 
the `next()` we should place the `copy() ` there so that callers do not have to 
themselves worry about calling .copy on the data, like i was doing before in 
this specific read path.
   ```
   @Override
     public UnsafeRow next() {
       if (!hasNext()) {
         throw new IllegalStateException("No more records available");
       }
       InternalRow row = rowIterator.next();
       // Convert to UnsafeRow immediately while batch is still open
       return projection.apply(row).copy();
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to