stayrascal opened a new issue, #8571:
URL: https://github.com/apache/hudi/issues/8571
**_Tips before filing an issue_**
**Describe the problem you faced**
I'm using spark 3.2.1 and hudi 0.11.1, and I have a MOR table wrote by a
Flink 1.15 stream job continuously.
I'm trying to query data from RT table via spark-sql, I found the query
result is not refresh if we execute a query SQL multiple times, it seems that
the current spark session bind to a fix snapshot/commit. If I open a spark-sql
and run same query SQL, I can found the latest result, but will meet same
problem later. Since I only keep the latest 5 delta commit, a few moments
later, I will meet FileNotFoundException because the old parquet file has been
removed.
the table metadata
```
CREATE TABLE flink_hudi_mor_streaming_tbl_1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3)
)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'write.tasks' = '4',
'compaction.trigger.strategy' = 'num_or_time',
'compaction.delta_commits' = '5',
'compaction.delta_seconds' = '600',
'compaction.tasks' = '4',
'write.parquet.max.file.size' = '128',
'write.batch.size' = '128',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'jdbc',
'hive_sync.skip_ro_suffix' = 'true',
'hive_sync.metastore.uris' = 'thrift://xxxxxx:9083',
'hive_sync.jdbc_url' = 'jdbc:hive2:/xxxxxxx:10000',
'hive_sync.table' = 'flink_hudi_mor_streaming_tbl_1',
'hive_sync.db' = 'xxxx',
'hive_sync.username' = 'xxxx',
'hive_sync.password' = 'xxxxxx',
'path' = 'xxxxxxxxxxxxxxx',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'precombine.field' = 'ts'
);
```
```
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.905 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.422 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.381 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.38 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.365 seconds, Fetched 5 row(s)
spark-sql> call show_commits(table => 'flink_hudi_mor_streaming_tbl_1_rt');
20230425150737787 8147 0 1 1 60 60 0
20230425150707878 8166 0 1 1 60 60 0
20230425150707502 440544 1 0 1 281 0 0
20230425150637592 8164 0 1 1 60 60 0
20230425150607557 8166 0 1 1 60 60 0
20230425150537662 8163 0 1 1 60 60 0
20230425150508373 8086 0 1 1 60 60 0
20230425150447858 5885 0 1 1 42 0 0
Time taken: 0.436 seconds, Fetched 8 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.464 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt
timestamp as of '20230425150737787' group by age order by age limit 5;
20 14
21 15
22 17
23 16
24 18
Time taken: 1.137 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.346 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt
timestamp as of '20230425150447858' group by age order by age limit 5;
20 5
21 1
22 2
23 3
24 4
Time taken: 0.713 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt
timestamp as of '20230425150508373' group by age order by age limit 5;
20 7
21 2
22 4
23 4
24 10
Time taken: 0.742 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt
timestamp as of '20230425150537662' group by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.706 seconds, Fetched 5 row(s)
spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group
by age order by age limit 5;
20 8
21 4
22 7
23 8
24 12
Time taken: 0.41 seconds, Fetched 5 row(s)
```
**To Reproduce**
Steps to reproduce the behavior:
1. create & insert data via flink sql
```
CREATE TABLE flink_hudi_mor_streaming_tbl_1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3)
)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'write.tasks' = '4',
'compaction.trigger.strategy' = 'num_or_time',
'compaction.delta_commits' = '5',
'compaction.delta_seconds' = '600',
'compaction.tasks' = '4',
'write.parquet.max.file.size' = '128',
'write.batch.size' = '128',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'jdbc',
'hive_sync.skip_ro_suffix' = 'true',
'hive_sync.metastore.uris' = 'thrift://xxxxxx:9083',
'hive_sync.jdbc_url' = 'jdbc:hive2:/xxxxxxx:10000',
'hive_sync.table' = 'flink_hudi_mor_streaming_tbl_1',
'hive_sync.db' = 'xxxx',
'hive_sync.username' = 'xxxx',
'hive_sync.password' = 'xxxxxx',
'path' = 'xxxxxxxxxxxxxxx',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'precombine.field' = 'ts'
);
CREATE TABLE `fake_datasource` (
`uuid` STRING,
`name` STRING,
`age` INT,
`ts` AS PROCTIME()
) WITH (
'connector' = 'faker',
'rows-per-second' = '2',
'fields.uuid.expression' = '#{numerify ''id####''}',
'fields.name.expression' = '#{superhero.name}',
'fields.age.expression' = '#{number.numberBetween ''20'',''50''}',
'fields.ts.expression' = '#{date.past ''45'',''10'',''SECONDS''}'
);
insert into flink_hudi_mor_streaming_tbl_1 select * from fake_datasource;
set execution.checkpointing.interval=30sec;
```
2. start a spark-sql to query table multiple times
```
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
```
```
select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age
order by age limit 5;
```
**Expected behavior**
I'm not sure if there any configured key is wrong or missing something, but
i think it should query the latest RT view during execute each query SQL.
**Environment Description**
* Hudi version : 0.11.1
* Spark version : 3.2.1
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) :
* Running on Docker? (yes/no) :
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]