This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-feature-rfc46 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 60edf301d7cec9f928c743f4e499389c4525e9aa Author: komao <[email protected]> AuthorDate: Thu Jun 30 04:29:33 2022 +0800 [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005) Co-authored-by: wangzixuan.wzxuan <[email protected]> --- .../sql/hudi/command/procedures/ExportInstantsProcedure.scala | 9 ++++++--- .../procedures/ShowHoodieLogFileMetadataProcedure.scala | 8 ++++++-- .../procedures/ShowHoodieLogFileRecordsProcedure.scala | 11 +++++++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala index 114f4c4ee1..ad21c11e9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.avro.specific.SpecificData import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hudi.HoodieCLIUtils import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.model.HoodieArchivedMetaEntry import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.log.HoodieLogFormat import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock @@ -124,7 +124,10 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L }) { val blk = reader.next.asInstanceOf[HoodieAvroDataBlock] try { - val recordItr = blk.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = blk.getRecordIterator(mapper) try while ( { recordItr.hasNext }) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala index 3a26823ded..415c642d95 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.hudi.command.procedures import com.fasterxml.jackson.databind.ObjectMapper import org.apache.hadoop.fs.Path +import org.apache.avro.generic.IndexedRecord import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.log.HoodieLogFormat import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType} import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock} @@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui } block match { case dataBlock: HoodieDataBlock => - val recordItr = dataBlock.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = dataBlock.getRecordIterator(mapper) recordItr.asScala.foreach(_ => recordCount.incrementAndGet()) recordItr.close() } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 2806138a89..ecee96bc46 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -21,7 +21,7 @@ import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.fs.Path import org.apache.hudi.common.config.HoodieCommonConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.table.log.block.HoodieDataBlock import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -78,7 +78,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue) .build scanner.asScala.foreach(hoodieRecord => { - val record = hoodieRecord.getData.getInsertValue(schema).get() + val record = hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get() if (allRecords.size() < limit) { allRecords.add(record) } @@ -92,10 +92,13 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil val block = reader.next() block match { case dataBlock: HoodieDataBlock => - val recordItr = dataBlock.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = dataBlock.getRecordIterator(mapper) recordItr.asScala.foreach(record => { if (allRecords.size() < limit) { - allRecords.add(record) + allRecords.add(record.getData.asInstanceOf[IndexedRecord]) } }) recordItr.close()
