[
https://issues.apache.org/jira/browse/HUDI-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835888#comment-17835888
]
Jonathan Vexler edited comment on HUDI-7269 at 4/10/24 9:03 PM:
----------------------------------------------------------------
Did an experiment to learn about position based log blocks:
{code:java}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "trips_table"
val basePath = "file:///tmp/trips_table_1"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*);
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.datasource.read.use.new.parquet.file.format", "true").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Overwrite).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" ===
"rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Append).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" ===
"rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "false").
mode(Append).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare",
col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Append).
save(basePath) {code}
I set write positions to true on writes 1,2,4 and false on 3.
We can see 3 log files here:
!image (6).png|width=624,height=268!
However, when I try to print the footer of one of the log blocks, it says it's
not a valid parquet file:
!image (7).png|width=665,height=133!
in hoodie datablock, I added print statements if it's going to add positions or
not:
!image (8).png|width=617,height=438!
In the first write the output was:
24/04/10 16:34:58 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the second write the output was:
24/04/10 16:35:00 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER
24/04/10 16:35:00 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the third write the output was:
24/04/10 16:35:01 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
24/04/10 16:35:02 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the fourth write the output was:
24/04/10 16:35:03 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER
24/04/10 16:35:03 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
I tried with avro datablocks and the output to the logging was the same
I tried reading the log blocks but it says it's an invalid avro file. Also the
avro log files are 1/5 the size of the parquet log files:
!image (9).png|width=631,height=202!
was (Author: JIRAUSER295101):
Did an experiment to learn about position based log blocks:
{code:java}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "trips_table"
val basePath = "file:///tmp/trips_table_1"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*);
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.datasource.read.use.new.parquet.file.format", "true").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Overwrite).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" ===
"rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Append).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" ===
"rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "false").
mode(Append).
save(basePath)
val updatesDf = spark.read.
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.merge.use.record.positions", "false").
format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare",
col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.logfile.data.block.format", "parquet").
option("hoodie.datasource.write.record.merger.impls",
"org.apache.hudi.HoodieSparkRecordMerger").
option("hoodie.file.group.reader.enabled", "true").
option("hoodie.write.record.positions", "true").
mode(Append).
save(basePath) {code}
I set write positions to true on writes 1,2,4 and false on 3.
We can see 3 log files here:
!image (6).png!
However, when I try to print the footer of one of the log blocks, it says it's
not a valid parquet file:
!image (7).png!
in hoodie datablock, I added print statements if it's going to add positions or
not:
!image (8).png!
In the first write the output was:
24/04/10 16:34:58 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the second write the output was:
24/04/10 16:35:00 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER
24/04/10 16:35:00 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the third write the output was:
24/04/10 16:35:01 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
24/04/10 16:35:02 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
In the fourth write the output was:
24/04/10 16:35:03 WARN HoodieDataBlock: ADDING POSITIONS TO HEADER
24/04/10 16:35:03 WARN HoodieDataBlock: NOT ADDING POSITIONS TO HEADER
I tried with avro datablocks and the output to the logging was the same
I tried reading the log blocks but it says it's an invalid avro file. Also the
avro log files are 1/5 the size of the parquet log files:
!image (9).png!
> Fallback to key-based merging if there is no positions in log header
> --------------------------------------------------------------------
>
> Key: HUDI-7269
> URL: https://issues.apache.org/jira/browse/HUDI-7269
> Project: Apache Hudi
> Issue Type: Improvement
> Reporter: Ethan Guo
> Assignee: Jonathan Vexler
> Priority: Blocker
> Fix For: 1.0.0
>
> Attachments: image (6).png, image (7).png, image (8).png, image
> (9).png
>
>
> When turning on merging with record positions, if there is no position header
> in the log blocks, the reader should fall back to key-based merging.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)