bithw1 opened a new issue #2267:
URL: https://github.com/apache/hudi/issues/2267
Hi,
I am doing POC about Hudi, but I am quite new to Hudi, I am stuck here! I
would like to ask about Hudi merge on read table
(query out only the basic parquet data, not include the delta data)
I am using Hudi 0.6.0 + Spark 2.4.4
I use the following code to do 3 operations: `1. overwrite 2. upsert 3.
upsert.`
1. After the operations are done, when I look at the HDFS files, there are 3
parquets there, I have thought that only one parquet be there, the two upsert
operations should be written to avro files as delta commits.
```
[root@host1]hdfs dfs -ls
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19
Found 4 items
-rw-r--r-- 2 rootsupergroup 93 2020-11-21 14:45
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/.hoodie_partition_metadata
-rw-r--r-- 2 rootsupergroup 435000 2020-11-21 14:45
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-21-21_20201121144541.parquet
-rw-r--r-- 2 root supergroup 434787 2020-11-21 14:45
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-51-49_20201121144550.parquet
-rw-r--r-- 2 root supergroup 434842 2020-11-21 14:45
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-81-77_20201121144554.parquet
```
2. In the code, I have used the following code snippet to disable
compaction, not sure it is correct to disable compaction, I disable compaction
simply want to see the delta commit files
```
//disable async compact
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
100)
//disable inline compact
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
```
3. In Hive,
I do the following query, I want to query only the basic parquet data, not
include the delta commit data.
`select * from xyz.hudi_hive_read_write_mor_4_ro;`, it query out all the
recent records(with the larget creation_date for the updated record) data, like
a snapshot query
4. In spark shell,
I do the same query:
```
val basepath="/data/hudi_demo/hudi_hive_read_write_mor_4"
val x=
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
val df =x.load(basepath + "/*")
df.show
```
The result is same as query in hive.
Following is the code, that I use to do the POC
```
package org.example.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{SaveMode, SparkSession}
case class MyOrder(
name: String,
price: String,
creation_date: String,
dt: String)
object MORTest {
val overwrite1Data = Seq(
MyOrder("A", "12.7", "2020-11-18 14:43:32", "2020-11-19"),
MyOrder("B", "13.2", "2020-11-18 14:42:21", "2020-11-19"),
MyOrder("C", "11.6", "2020-11-18 14:47:19", "2020-11-19"),
MyOrder("D", "10.4", "2020-11-18 14:46:50", "2020-11-19")
)
val insertUpdate1Data = Seq(
MyOrder("A", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //update A
MyOrder("E", "11.6", "2020-11-18 14:47:19", "2020-11-19"), //add E,F
MyOrder("F", "10.4", "2020-11-18 14:46:50", "2020-11-19")
)
val insertUpdate2Data = Seq(
MyOrder("X", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //add X, Y
MyOrder("Y", "18.2", "2020-11-19 14:42:21", "2020-11-19"),
MyOrder("F", "17.4", "2020-11-18 15:46:50", "2020-11-19") //Update F
)
val spark = SparkSession.builder.appName("MORTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_mor_4"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(op: Int) = {
val (data, saveMode) = op match {
case 1 => (overwrite1Data, SaveMode.Overwrite)
case 2 => (insertUpdate1Data, SaveMode.Append)
case 3 => (insertUpdate2Data, SaveMode.Append)
}
import spark.implicits._
val insertData = spark.createDataset(data)
insertData.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
//table type: MOR
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
//disable async compact
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
100)
//disable inline compact
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
//do overwrite
run(1)
//do upsert
run(2)
//do upsert
run(3)
println("===MOR is done=====")
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]