This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this
push:
new 9c361b7954 [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator
(#6005)
9c361b7954 is described below
commit 9c361b79547198cdd35d35b62957e2e9ea2cfbcf
Author: komao <[email protected]>
AuthorDate: Thu Jun 30 04:29:33 2022 +0800
[HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)
Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
.../sql/hudi/command/procedures/ExportInstantsProcedure.scala | 9 ++++++---
.../procedures/ShowHoodieLogFileMetadataProcedure.scala | 8 ++++++--
.../procedures/ShowHoodieLogFileRecordsProcedure.scala | 11 +++++++----
3 files changed, 19 insertions(+), 9 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index ff6ab92179..3aeb4a952d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -17,13 +17,13 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.avro.specific.SpecificData
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -123,7 +123,10 @@ class ExportInstantsProcedure extends BaseProcedure with
ProcedureBuilder with L
}) {
val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
try {
- val recordItr = blk.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new
HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = blk.getRecordIterator(mapper)
try while ( {
recordItr.hasNext
}) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 7f0730386e..fc7a58962a 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.{Lists, Maps}
import org.apache.hadoop.fs.Path
+import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.table.log.HoodieLogFormat
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType,
HoodieLogBlockType}
import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock,
HoodieDataBlock}
@@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends
BaseProcedure with ProcedureBui
}
block match {
case dataBlock: HoodieDataBlock =>
- val recordItr = dataBlock.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new
HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = dataBlock.getRecordIterator(mapper)
recordItr.asScala.foreach(_ => recordCount.incrementAndGet())
recordItr.close()
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index a7d09dceeb..7d2431eba6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile,
HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
@@ -79,7 +79,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
.build
scanner.asScala.foreach(hoodieRecord => {
- val record = hoodieRecord.getData.getInsertValue(schema).get()
+ val record =
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
if (allRecords.size() < limit) {
allRecords.add(record)
}
@@ -93,10 +93,13 @@ class ShowHoodieLogFileRecordsProcedure extends
BaseProcedure with ProcedureBuil
val block = reader.next()
block match {
case dataBlock: HoodieDataBlock =>
- val recordItr = dataBlock.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new
HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = dataBlock.getRecordIterator(mapper)
recordItr.asScala.foreach(record => {
if (allRecords.size() < limit) {
- allRecords.add(record)
+ allRecords.add(record.getData.asInstanceOf[IndexedRecord])
}
})
recordItr.close()