codejoyan opened a new issue, #5231:
URL: https://github.com/apache/hudi/issues/5231

   Hudi - 0.11 (Built using master branch)
   Spark - 2.4.4
   
   I am trying to compare the query output from a snapshot query VS a query to 
fetch data from files returned by GetLatestBaseFiles (as below). 
   
   What might be the reason for the below 2 observations: 
   
   1. Difference in count between Section A and Section B. 
   2. I am using a COW table but the latest file slice for a file group doesn't 
have all the records.  (Section C)
   
   **Files listed by GetLatestBaseFiles**
   ```
   d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet
   24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet
   ```
   
   **Section A: SnapShot Query Output (_Expected_)**
   ```
   scala> spark.sql("select date, count(1) from stock_ticks_cow group by 
date").show(false)
   +----------+--------+                                                        
   
   |date      |count(1)|
   +----------+--------+
   |2019/08/31|197     |
   |2018/08/31|197     |
   +----------+--------+
   ```
   
   **Section B: Query Output Using list of files returned by 
GetLatestBaseFiles**
   ```
   scala> spark.sql("select date, count(1) from stock_ticks_cow where 
_hoodie_file_name in 
('d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet', 
'24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet') 
group by date").show(false)
   +----------+--------+
   |date      |count(1)|
   +----------+--------+
   |2019/08/31|197     |
   |2018/08/31|99      |
   +----------+--------+
   ```
   
   **Section C: The latest file slice has only a subset of records in COW 
(expected - 197, actual - 99)**
   
   ```
   -rw-r--r--   1 root supergroup         96 2022-04-05 11:46 
/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata
   -rw-r--r--   1 root supergroup     443927 2022-04-05 11:51 
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115122268.parquet
   -rw-r--r--   1 root supergroup     443653 2022-04-05 11:52 
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet
   -rw-r--r--   1 root supergroup     443919 2022-04-05 11:46 
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_1-35-37_20220405114604187.parquet
   
   scala> spark.sql("select _hoodie_file_name, count(distinct key) from 
stock_ticks_cow where _hoodie_file_name in 
('24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet') 
group by _hoodie_file_name").show(false)
   
+------------------------------------------------------------------------+-------------------+
   |_hoodie_file_name                                                       
|count(DISTINCT key)|
   
+------------------------------------------------------------------------+-------------------+
   |24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet|99 
                |
   
+------------------------------------------------------------------------+-------------------+
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   **List Latest Base Files**
   ```
   scala> val basePath = "/user/hive/warehouse/stock_ticks_cow"
   basePath: String = /user/hive/warehouse/stock_ticks_cow
   
   scala> val conf: SerializableConfiguration = new 
SerializableConfiguration(new Configuration())
   conf: org.apache.hudi.common.config.SerializableConfiguration = 
Configuration: core-default.xml, core-site.xml, mapred-default.xml, 
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, 
hdfs-site.xml
   
   scala> val engineContext: HoodieLocalEngineContext = new 
HoodieLocalEngineContext(conf.get());
   engineContext: org.apache.hudi.common.engine.HoodieLocalEngineContext = 
org.apache.hudi.common.engine.HoodieLocalEngineContext@b8471c9
   
   scala> val metaClient: HoodieTableMetaClient = 
HoodieTableMetaClient.builder().setConf(conf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
   metaClient: org.apache.hudi.common.table.HoodieTableMetaClient = 
HoodieTableMetaClient{basePath='/user/hive/warehouse/stock_ticks_cow', 
metaPath='/user/hive/warehouse/stock_ticks_cow/.hoodie', 
tableType=COPY_ON_WRITE}
   
   scala> val timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   timeline: org.apache.hudi.common.table.timeline.HoodieTimeline = 
org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: 
[20220405114604187__commit__COMPLETED],[20220405115122268__commit__COMPLETED],[20220405115234824__commit__COMPLETED]
   
   scala> val metadataConfig = 
HoodieInputFormatUtils.buildMetadataConfig(conf.get())
   metadataConfig: org.apache.hudi.common.config.HoodieMetadataConfig = 
org.apache.hudi.common.config.HoodieMetadataConfig@732b3ec8
   
   scala> val fsView = new HoodieMetadataFileSystemView(engineContext, 
metaClient, timeline, metadataConfig)
   fsView: org.apache.hudi.metadata.HoodieMetadataFileSystemView = 
org.apache.hudi.metadata.HoodieMetadataFileSystemView@4a20f6ea
   
   scala> val partitions = FSUtils.getAllPartitionPaths(engineContext, 
metadataConfig, basePath).iterator().asScala.toList;
   partitions: List[String] = List(2018/08/31, 2019/08/31)                  
   
   scala> partitions.map(x => {
        | val engContext = new HoodieLocalEngineContext(conf.get());
        | val fsView = new HoodieMetadataFileSystemView(engContext, metaClient, 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), 
metadataConfig);
        | 
fsView.getLatestBaseFiles(x).iterator().asScala.toList.map(_.getFileName)
        | })
   res15: List[List[String]] = 
List(List(24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet),
 
List(d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet)) 
   ```
   
   **Steps to Reproduce**
   I am following the steps in the docker demo. There are 2 json files 
(batch_1.json, batch_2.json) in docker/demo/data. I created an additional json 
file batch_3.json. Just changed the year from 2018 to 2019 from the 
batch_1.json file.
   **Commit 1:**
   **terminal 1:**
   ```
   j0s0j7j@m-c02d25lnmd6n data % cat batch_3.json | kcat -b kafkabroker -t 
stock_tick -P
   j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t 
stock_tick -P
   j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t 
stock_tick -P
   ```
   **terminal 2:**
   ```
   docker exec -it adhoc-2 /bin/bash
   
   # Run the following spark-submit command to execute the delta-streamer and 
ingest to stock_ticks_cow table in HDFS
   spark-submit \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
$HUDI_UTILITIES_BUNDLE \
     --table-type COPY_ON_WRITE \
     --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
     --source-ordering-field ts  \
     --target-base-path /user/hive/warehouse/stock_ticks_cow \
     --target-table stock_ticks_cow --props 
/var/demo/config/kafka-source.properties \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   ```
   **Commit 2:**
   **terminal 1:**
   ```
   j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t 
stock_tick -P
   j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t 
stock_tick -P
   ```
   **terminal 2:**
   Execute deltastreamer job as Commit 1
   
   **terminal 2:**
   **Commit 3:**
   **terminal 1:**
   `j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t 
stock_tick -P`
   **terminal 2:**
   Execute deltastreamer job as Commit 1
   
   
   **Environment Description**
   
   * Hudi version : Built using master branch (0.11)
   
   * Spark version : 2.4.4
   
   * Running on Docker? yes
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to