This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this 
push:
     new 9c361b7954 [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator 
(#6005)
9c361b7954 is described below

commit 9c361b79547198cdd35d35b62957e2e9ea2cfbcf
Author: komao <[email protected]>
AuthorDate: Thu Jun 30 04:29:33 2022 +0800

    [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)
    
    Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
 .../sql/hudi/command/procedures/ExportInstantsProcedure.scala |  9 ++++++---
 .../procedures/ShowHoodieLogFileMetadataProcedure.scala       |  8 ++++++--
 .../procedures/ShowHoodieLogFileRecordsProcedure.scala        | 11 +++++++----
 3 files changed, 19 insertions(+), 9 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index ff6ab92179..3aeb4a952d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -17,13 +17,13 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.avro.specific.SpecificData
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -123,7 +123,10 @@ class ExportInstantsProcedure extends BaseProcedure with 
ProcedureBuilder with L
       }) {
         val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
         try {
-          val recordItr = blk.getRecordIterator
+          val mapper = new HoodieRecord.Mapper() {
+            override def apply(data: IndexedRecord) = new 
HoodieAvroIndexedRecord(data)
+          }
+          val recordItr = blk.getRecordIterator(mapper)
           try while ( {
             recordItr.hasNext
           }) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 7f0730386e..fc7a58962a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.collect.{Lists, Maps}
 import org.apache.hadoop.fs.Path
+import org.apache.avro.generic.IndexedRecord
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, 
HoodieLogBlockType}
 import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, 
HoodieDataBlock}
@@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends 
BaseProcedure with ProcedureBui
             }
             block match {
               case dataBlock: HoodieDataBlock =>
-                val recordItr = dataBlock.getRecordIterator
+                val mapper = new HoodieRecord.Mapper() {
+                  override def apply(data: IndexedRecord) = new 
HoodieAvroIndexedRecord(data)
+                }
+                val recordItr = dataBlock.getRecordIterator(mapper)
                 recordItr.asScala.foreach(_ => recordCount.incrementAndGet())
                 recordItr.close()
             }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index a7d09dceeb..7d2431eba6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.config.HoodieCommonConfig
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, 
HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
 import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -79,7 +79,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
         .build
       scanner.asScala.foreach(hoodieRecord => {
-        val record = hoodieRecord.getData.getInsertValue(schema).get()
+        val record = 
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
         if (allRecords.size() < limit) {
           allRecords.add(record)
         }
@@ -93,10 +93,13 @@ class ShowHoodieLogFileRecordsProcedure extends 
BaseProcedure with ProcedureBuil
             val block = reader.next()
             block match {
               case dataBlock: HoodieDataBlock =>
-                val recordItr = dataBlock.getRecordIterator
+                val mapper = new HoodieRecord.Mapper() {
+                  override def apply(data: IndexedRecord) = new 
HoodieAvroIndexedRecord(data)
+                }
+                val recordItr = dataBlock.getRecordIterator(mapper)
                 recordItr.asScala.foreach(record => {
                   if (allRecords.size() < limit) {
-                    allRecords.add(record)
+                    allRecords.add(record.getData.asInstanceOf[IndexedRecord])
                   }
                 })
                 recordItr.close()

Reply via email to