codejoyan opened a new issue, #5231:
URL: https://github.com/apache/hudi/issues/5231
Hudi - 0.11 (Built using master branch)
Spark - 2.4.4
I am trying to compare the query output from a snapshot query VS a query to
fetch data from files returned by GetLatestBaseFiles (as below).
What might be the reason for the below 2 observations:
1. Difference in count between Section A and Section B.
2. I am using a COW table but the latest file slice for a file group doesn't
have all the records. (Section C)
**Files listed by GetLatestBaseFiles**
```
d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet
24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet
```
**Section A: SnapShot Query Output (_Expected_)**
```
scala> spark.sql("select date, count(1) from stock_ticks_cow group by
date").show(false)
+----------+--------+
|date |count(1)|
+----------+--------+
|2019/08/31|197 |
|2018/08/31|197 |
+----------+--------+
```
**Section B: Query Output Using list of files returned by
GetLatestBaseFiles**
```
scala> spark.sql("select date, count(1) from stock_ticks_cow where
_hoodie_file_name in
('d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet',
'24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet')
group by date").show(false)
+----------+--------+
|date |count(1)|
+----------+--------+
|2019/08/31|197 |
|2018/08/31|99 |
+----------+--------+
```
**Section C: The latest file slice has only a subset of records in COW
(expected - 197, actual - 99)**
```
-rw-r--r-- 1 root supergroup 96 2022-04-05 11:46
/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata
-rw-r--r-- 1 root supergroup 443927 2022-04-05 11:51
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115122268.parquet
-rw-r--r-- 1 root supergroup 443653 2022-04-05 11:52
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet
-rw-r--r-- 1 root supergroup 443919 2022-04-05 11:46
/user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_1-35-37_20220405114604187.parquet
scala> spark.sql("select _hoodie_file_name, count(distinct key) from
stock_ticks_cow where _hoodie_file_name in
('24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet')
group by _hoodie_file_name").show(false)
+------------------------------------------------------------------------+-------------------+
|_hoodie_file_name
|count(DISTINCT key)|
+------------------------------------------------------------------------+-------------------+
|24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet|99
|
+------------------------------------------------------------------------+-------------------+
```
**To Reproduce**
Steps to reproduce the behavior:
**List Latest Base Files**
```
scala> val basePath = "/user/hive/warehouse/stock_ticks_cow"
basePath: String = /user/hive/warehouse/stock_ticks_cow
scala> val conf: SerializableConfiguration = new
SerializableConfiguration(new Configuration())
conf: org.apache.hudi.common.config.SerializableConfiguration =
Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml
scala> val engineContext: HoodieLocalEngineContext = new
HoodieLocalEngineContext(conf.get());
engineContext: org.apache.hudi.common.engine.HoodieLocalEngineContext =
org.apache.hudi.common.engine.HoodieLocalEngineContext@b8471c9
scala> val metaClient: HoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(conf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
metaClient: org.apache.hudi.common.table.HoodieTableMetaClient =
HoodieTableMetaClient{basePath='/user/hive/warehouse/stock_ticks_cow',
metaPath='/user/hive/warehouse/stock_ticks_cow/.hoodie',
tableType=COPY_ON_WRITE}
scala> val timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
timeline: org.apache.hudi.common.table.timeline.HoodieTimeline =
org.apache.hudi.common.table.timeline.HoodieDefaultTimeline:
[20220405114604187__commit__COMPLETED],[20220405115122268__commit__COMPLETED],[20220405115234824__commit__COMPLETED]
scala> val metadataConfig =
HoodieInputFormatUtils.buildMetadataConfig(conf.get())
metadataConfig: org.apache.hudi.common.config.HoodieMetadataConfig =
org.apache.hudi.common.config.HoodieMetadataConfig@732b3ec8
scala> val fsView = new HoodieMetadataFileSystemView(engineContext,
metaClient, timeline, metadataConfig)
fsView: org.apache.hudi.metadata.HoodieMetadataFileSystemView =
org.apache.hudi.metadata.HoodieMetadataFileSystemView@4a20f6ea
scala> val partitions = FSUtils.getAllPartitionPaths(engineContext,
metadataConfig, basePath).iterator().asScala.toList;
partitions: List[String] = List(2018/08/31, 2019/08/31)
scala> partitions.map(x => {
| val engContext = new HoodieLocalEngineContext(conf.get());
| val fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
metadataConfig);
|
fsView.getLatestBaseFiles(x).iterator().asScala.toList.map(_.getFileName)
| })
res15: List[List[String]] =
List(List(24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet),
List(d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet))
```
**Steps to Reproduce**
I am following the steps in the docker demo. There are 2 json files
(batch_1.json, batch_2.json) in docker/demo/data. I created an additional json
file batch_3.json. Just changed the year from 2018 to 2019 from the
batch_1.json file.
**Commit 1:**
**terminal 1:**
```
j0s0j7j@m-c02d25lnmd6n data % cat batch_3.json | kcat -b kafkabroker -t
stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t
stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t
stock_tick -P
```
**terminal 2:**
```
docker exec -it adhoc-2 /bin/bash
# Run the following spark-submit command to execute the delta-streamer and
ingest to stock_ticks_cow table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
$HUDI_UTILITIES_BUNDLE \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow --props
/var/demo/config/kafka-source.properties \
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider
```
**Commit 2:**
**terminal 1:**
```
j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t
stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t
stock_tick -P
```
**terminal 2:**
Execute deltastreamer job as Commit 1
**terminal 2:**
**Commit 3:**
**terminal 1:**
`j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t
stock_tick -P`
**terminal 2:**
Execute deltastreamer job as Commit 1
**Environment Description**
* Hudi version : Built using master branch (0.11)
* Spark version : 2.4.4
* Running on Docker? yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]