bithw1 opened a new issue #2267:
URL: https://github.com/apache/hudi/issues/2267


   
   
   Hi,
   I am doing POC about Hudi, but I am quite new to Hudi, I am stuck here! I 
would like to ask about Hudi merge on read table
   (query out only the basic parquet data, not include the delta data)
   
   I am using Hudi 0.6.0 + Spark 2.4.4
   
   
   I use the following code to do 3 operations: `1. overwrite 2. upsert 3. 
upsert.`
   
   1. After the operations are done, when I look at the HDFS files, there are 3 
parquets there, I have thought that only one parquet be there, the two upsert 
operations should be written to avro files as delta commits.
   
   ```
   [root@host1]hdfs dfs -ls 
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19
   Found 4 items
   -rw-r--r--   2 rootsupergroup         93 2020-11-21 14:45 
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/.hoodie_partition_metadata
   -rw-r--r--   2 rootsupergroup     435000 2020-11-21 14:45 
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-21-21_20201121144541.parquet
   -rw-r--r--   2 root supergroup     434787 2020-11-21 14:45 
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-51-49_20201121144550.parquet
   -rw-r--r--   2 root supergroup     434842 2020-11-21 14:45 
/data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-81-77_20201121144554.parquet
   ```
   
   2. In the code, I have used the following code snippet to disable 
compaction, not sure it is correct to disable compaction, I disable compaction 
simply want to see the delta commit files
   
   ```
         //disable async compact
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 
100)
         //disable inline compact
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   ```
   3. In Hive,
   
   I do the following query, I want to query only the basic parquet data, not 
include the delta commit data.
   `select * from xyz.hudi_hive_read_write_mor_4_ro;`, it query out all the 
recent records(with the larget creation_date for the updated record) data, like 
a snapshot query
   
   4. In spark shell,
   
   I do the same query:
   
   ```
   val basepath="/data/hudi_demo/hudi_hive_read_write_mor_4"
   
   val x= 
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
   
   val df =x.load(basepath + "/*")
   
   df.show
   ```
   
   The result is same as query in hive.
   
   
   Following is the code, that I use to do the POC
   
   ```
   package org.example.hudi
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
   import org.apache.hudi.index.HoodieIndex
   import org.apache.spark.sql.{SaveMode, SparkSession}
   
   case class MyOrder(
                       name: String,
                       price: String,
                       creation_date: String,
                       dt: String)
   
   object MORTest {
     val overwrite1Data = Seq(
       MyOrder("A", "12.7", "2020-11-18 14:43:32", "2020-11-19"),
       MyOrder("B", "13.2", "2020-11-18 14:42:21", "2020-11-19"),
       MyOrder("C", "11.6", "2020-11-18 14:47:19", "2020-11-19"),
       MyOrder("D", "10.4", "2020-11-18 14:46:50", "2020-11-19")
     )
   
     val insertUpdate1Data = Seq(
       MyOrder("A", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //update A
       MyOrder("E", "11.6", "2020-11-18 14:47:19", "2020-11-19"), //add E,F
       MyOrder("F", "10.4", "2020-11-18 14:46:50", "2020-11-19")
     )
   
     val insertUpdate2Data = Seq(
       MyOrder("X", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //add X, Y
       MyOrder("Y", "18.2", "2020-11-19 14:42:21", "2020-11-19"),
       MyOrder("F", "17.4", "2020-11-18 15:46:50", "2020-11-19") //Update F
     )
   
     val spark = SparkSession.builder.appName("MORTest")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
       .enableHiveSupport().getOrCreate()
   
     val hudi_table = "hudi_hive_read_write_mor_4"
   
     val base_path = s"/data/hudi_demo/$hudi_table"
   
     def run(op: Int) = {
       val (data, saveMode) = op match {
         case 1 => (overwrite1Data, SaveMode.Overwrite)
         case 2 => (insertUpdate1Data, SaveMode.Append)
         case 3 => (insertUpdate2Data, SaveMode.Append)
       }
       import spark.implicits._
       val insertData = spark.createDataset(data)
       insertData.write.format("hudi")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"creation_date")
         .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
   
         //table type: MOR
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
   
         //disable async compact
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 
100)
         //disable inline compact
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   
   
         .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, 
"jdbc:hive2://10.41.90.208:10000")
         .option(HoodieWriteConfig.TABLE_NAME, hudi_table)
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
         .option("hoodie.insert.shuffle.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .mode(saveMode)
         .save(base_path);
     }
   
   
     def main(args: Array[String]): Unit = {
       //do overwrite
       run(1)
   
       //do upsert
       run(2)
   
       //do upsert
       run(3)
   
       println("===MOR is done=====")
     }
   
   }
   ```
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to