rahil-c commented on code in PR #18042:
URL: https://github.com/apache/hudi/pull/18042#discussion_r2747202176
##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -76,25 +76,35 @@ public ClosableIterator<HoodieKey>
getHoodieKeyIterator(HoodieStorage storage,
Option<BaseKeyGenerator> keyGeneratorOpt,
Option<String>
partitionPath) {
try {
+ HoodieSchema keySchema = getKeyIteratorSchema(storage, filePath,
keyGeneratorOpt, partitionPath);
HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
.getFileReader(new HoodieReaderConfig(), filePath,
HoodieFileFormat.LANCE);
- ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
+ ClosableIterator<HoodieRecord> recordIterator =
reader.getRecordIterator(keySchema);
+
return new ClosableIterator<HoodieKey>() {
@Override
public void close() {
- keyIterator.close();
+ recordIterator.close();
}
@Override
public boolean hasNext() {
- return keyIterator.hasNext();
+ return recordIterator.hasNext();
}
@Override
public HoodieKey next() {
- String key = keyIterator.next();
- return new HoodieKey(key, partitionPath.orElse(null));
+ HoodieRecord record = recordIterator.next();
+ String recordKey;
+ if (keyGeneratorOpt.isPresent()) {
+ // With keyGenerator, extracts user-defined key fields by name
+ recordKey = record.getRecordKey(keySchema, keyGeneratorOpt);
+ } else {
+ // Without keyGenerator, read record key field by name
Review Comment:
Looked at `HoodieKeyIterator` examples, since we are using
`HoodieSparkRecord` for lance and I am not sure if we can operate on the avro
`GenericRecord` that this ierator is using
https://github.com/apache/hudi/blob/7510b1b243ecca7ee70421bbe5f2e1e1bd188cca/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java#L415.
For now I have added some other logic around getting the partition path.
One general thing I noticed in our `TestLanceDataSource` it seems we did not
add testing yet for partition paths
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala#L669
so when I wanted to exercise/validate the logic above by adding some tests
around this, was hitting a new bug here regarding `
org.apache.spark.sql.catalyst.expressions.JoinedRow cannot be cast to class
org.apache.spark.sql.catalyst.expressions.UnsafeRow ` :
```
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(D
AGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGSche
duler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted
(DAGScheduler.scala:2791)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$
1(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$
1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGSchedu
ler.scala:1247)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(D
AGScheduler.scala:3060)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2983)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(Spark
Plan.scala:455)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfu
n$relationFuture$1(BroadcastExchangeExec.scala:140)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocal
Captured$2(SQLExecution.scala:224)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifa
ctSet.scala:94)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocal
Captured$1(SQLExecution.scala:219)
at
java.base/java.util.concurrent.FutureTask.run$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoo
lExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPo
olExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class
org.apache.spark.sql.catalyst.expressions.JoinedRow cannot be cast to
class
org.apache.spark.sql.catalyst.expressions.UnsafeRow
(org.apache.spark.sql.catalyst.expressions.JoinedRow and
org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module
of
loader 'app')
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(S
parkPlan.scala:389)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD
.scala:893)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor
.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUt
ils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorU
tils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
... 3 more
```
So recent commit might have more changes then original feedback to address
this bug encountered, and ensure partition path testing is also present.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]