rahil-c commented on code in PR #18042:
URL: https://github.com/apache/hudi/pull/18042#discussion_r2747202176


##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -76,25 +76,35 @@ public ClosableIterator<HoodieKey> 
getHoodieKeyIterator(HoodieStorage storage,
                                                            
Option<BaseKeyGenerator> keyGeneratorOpt,
                                                            Option<String> 
partitionPath) {
     try {
+      HoodieSchema keySchema = getKeyIteratorSchema(storage, filePath, 
keyGeneratorOpt, partitionPath);
       HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
           .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
           .getFileReader(new HoodieReaderConfig(), filePath, 
HoodieFileFormat.LANCE);
-      ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
+      ClosableIterator<HoodieRecord> recordIterator = 
reader.getRecordIterator(keySchema);
+
       return new ClosableIterator<HoodieKey>() {
         @Override
         public void close() {
-          keyIterator.close();
+          recordIterator.close();
         }
 
         @Override
         public boolean hasNext() {
-          return keyIterator.hasNext();
+          return recordIterator.hasNext();
         }
 
         @Override
         public HoodieKey next() {
-          String key = keyIterator.next();
-          return new HoodieKey(key, partitionPath.orElse(null));
+          HoodieRecord record = recordIterator.next();
+          String recordKey;
+          if (keyGeneratorOpt.isPresent()) {
+            // With keyGenerator, extracts user-defined key fields by name
+            recordKey = record.getRecordKey(keySchema, keyGeneratorOpt);
+          } else {
+            // Without keyGenerator, read record key field by name

Review Comment:
   Looked at `HoodieKeyIterator` examples, since we are using 
`HoodieSparkRecord` for lance and I am not sure if we can operate on the avro 
`GenericRecord` that this ierator is using 
https://github.com/apache/hudi/blob/7510b1b243ecca7ee70421bbe5f2e1e1bd188cca/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java#L415.
   For now I have added some other logic around getting the partition path.
   
   One general thing I noticed in our `TestLanceDataSource` it seems we did not 
add testing yet for partition paths 
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala#L669
   so when I wanted to exercise/validate the logic above by adding some tests 
around this, was hitting a new bug here regarding `  
org.apache.spark.sql.catalyst.expressions.JoinedRow cannot be cast to class     
 
     org.apache.spark.sql.catalyst.expressions.UnsafeRow ` :
   ```
   Driver stacktrace:                                                         
                                                                                
      
     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(D   
      
     AGScheduler.scala:2856)                                                    
      
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGSche   
      
     duler.scala:2792)                                                          
      
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted   
      
     (DAGScheduler.scala:2791)                                                  
      
     at                                                                         
      
     scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)   
      
     at                                                                         
      
     scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  
      
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)      
      
     at                                                                         
      
     
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)     
 
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$   
      
     1(DAGScheduler.scala:1247)                                                 
      
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$   
      
     1$adapted(DAGScheduler.scala:1247)                                         
      
     at scala.Option.foreach(Option.scala:407)                                  
      
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGSchedu   
      
     ler.scala:1247)                                                            
      
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(D   
      
     AGScheduler.scala:3060)                                                    
      
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG   
      
     Scheduler.scala:2994)                                                      
      
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG   
      
     Scheduler.scala:2983)                                                      
      
     at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)          
      
     at                                                                         
      
     org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)     
      
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)           
      
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)           
      
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)           
      
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)           
      
     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)             
      
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s   
      
     cala:151)                                                                  
      
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s   
      
     cala:112)                                                                  
      
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)                       
      
     at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)                        
      
     at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(Spark   
      
     Plan.scala:455)                                                            
      
     at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfu   
      
     n$relationFuture$1(BroadcastExchangeExec.scala:140)                        
      
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocal   
      
     Captured$2(SQLExecution.scala:224)                                         
      
     at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifa   
      
     ctSet.scala:94)                                                            
      
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocal   
      
     Captured$1(SQLExecution.scala:219)                                         
      
     at                                                                         
      
     
java.base/java.util.concurrent.FutureTask.run$$capture(FutureTask.java:264)     
 
     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)          
      
     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoo   
      
     lExecutor.java:1128)                                                       
      
     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPo   
      
     olExecutor.java:628)                                                       
      
     at java.base/java.lang.Thread.run(Thread.java:829)                         
      
     Caused by: java.lang.ClassCastException: class                             
      
     org.apache.spark.sql.catalyst.expressions.JoinedRow cannot be cast to 
class      
     org.apache.spark.sql.catalyst.expressions.UnsafeRow                        
      
     (org.apache.spark.sql.catalyst.expressions.JoinedRow and                   
      
     org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module 
of     
     loader 'app')                                                              
      
     at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(S   
      
     parkPlan.scala:389)                                                        
      
     at                                                                         
      
     org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)   
      
     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD   
      
     .scala:893)                                                                
      
     at                                                                         
      
     org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)   
      
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)         
      
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)                        
      
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)      
      
     at                                                                         
      
     org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)   
      
     at org.apache.spark.scheduler.Task.run(Task.scala:141)                     
      
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor   
      
     .scala:620)                                                                
      
     at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUt   
      
     ils.scala:64)                                                              
      
     at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorU   
      
     tils.scala:61)                                                             
      
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)         
      
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)   
      
     ... 3 more    
   ```
   
   So recent commit might have more changes then original feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to