[ 
https://issues.apache.org/jira/browse/HUDI-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835888#comment-17835888
 ] 

Jonathan Vexler edited comment on HUDI-7269 at 4/10/24 9:05 PM:
----------------------------------------------------------------

Did an experiment to learn about position based log blocks:
{code:java}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._


val tableName = "trips_table"
val basePath = "file:///tmp/trips_table_1"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data = 
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
    
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
    
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
    
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*);
inserts.write.format("hudi").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.datasource.read.use.new.parquet.file.format", "true").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Overwrite).
  save(basePath)

val updatesDf = spark.read.
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.merge.use.record.positions", "false").
  format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Append).
  save(basePath)

  val updatesDf = spark.read.
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.merge.use.record.positions", "false").
  format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "false").
  mode(Append).
  save(basePath)

val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", 
col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Append).
  save(basePath) {code}
I set write positions to true on writes 1,2,4 and false on 3.

We can see 3 log files here:

!image (6).png|width=624,height=268!

 

However, when I try to print the footer of one of the log blocks, it says it's 
not a valid parquet file:

!image (7).png|width=665,height=133!

in hoodie datablock, I added print statements if it's going to add positions or 
not:

!image (8).png|width=617,height=438!
In the first write the output was:
{code:java}
24/04/10 16:34:58 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER{code}
In the second write the output was:
{code:java}
24/04/10 16:35:00 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER 
24/04/10 16:35:00 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER{code}
In the third  write the output was:
{code:java}
24/04/10 16:35:01 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER 
24/04/10 16:35:02 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER{code}
In the fourth write the output was:
{code:java}
24/04/10 16:35:03 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER 
24/04/10 16:35:03 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER{code}
I tried with avro datablocks and the output to the logging was the same
I tried reading the log blocks but it says it's an invalid avro file. Also the 
avro log files are 1/5 the size of the parquet log files:
!image (9).png|width=631,height=202!
 


was (Author: JIRAUSER295101):
Did an experiment to learn about position based log blocks:
{code:java}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._


val tableName = "trips_table"
val basePath = "file:///tmp/trips_table_1"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data = 
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
    
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
    
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
    
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*);
inserts.write.format("hudi").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.datasource.read.use.new.parquet.file.format", "true").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Overwrite).
  save(basePath)

val updatesDf = spark.read.
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.merge.use.record.positions", "false").
  format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Append).
  save(basePath)

  val updatesDf = spark.read.
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.merge.use.record.positions", "false").
  format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "false").
  mode(Append).
  save(basePath)

val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", 
col("fare") * 10)
  
updatesDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "upsert").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.datasource.write.record.merger.impls", 
"org.apache.hudi.HoodieSparkRecordMerger").
  option("hoodie.file.group.reader.enabled", "true").
  option("hoodie.write.record.positions", "true").
  mode(Append).
  save(basePath) {code}
I set write positions to true on writes 1,2,4 and false on 3.

We can see 3 log files here:

!image (6).png|width=624,height=268!

 

However, when I try to print the footer of one of the log blocks, it says it's 
not a valid parquet file:

!image (7).png|width=665,height=133!

in hoodie datablock, I added print statements if it's going to add positions or 
not:

!image (8).png|width=617,height=438!
In the first write the output was:
24/04/10 16:34:58 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the second write the output was:
24/04/10 16:35:00 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER 
24/04/10 16:35:00 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the third  write the output was:
24/04/10 16:35:01 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER 
24/04/10 16:35:02 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the fourth write the output was:
24/04/10 16:35:03 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER 
24/04/10 16:35:03 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER

I tried with avro datablocks and the output to the logging was the same
I tried reading the log blocks but it says it's an invalid avro file. Also the 
avro log files are 1/5 the size of the parquet log files:
!image (9).png|width=631,height=202!
 

> Fallback to key-based merging if there is no positions in log header
> --------------------------------------------------------------------
>
>                 Key: HUDI-7269
>                 URL: https://issues.apache.org/jira/browse/HUDI-7269
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Ethan Guo
>            Assignee: Jonathan Vexler
>            Priority: Blocker
>             Fix For: 1.0.0
>
>         Attachments: image (6).png, image (7).png, image (8).png, image 
> (9).png
>
>
> When turning on merging with record positions, if there is no position header 
> in the log blocks, the reader should fall back to key-based merging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to