weitianpei opened a new issue, #11090:
URL: https://github.com/apache/hudi/issues/11090
A hudi table `ods_itp_tss_hudi` is written by a flinksql program A whose
version is flink1.16.0-hudi13.0.
The program A's sql is :
"""
create table tss_odp_kafka_source
(
data string,
ts timestamp(3) metadata from 'timestamp',
`partition` int metadata from 'partition',
`offset` bigint metadata from 'offset'
) with (
'connector' = 'kafka',
'topic' = 'ods_vgc_itp_tss',
'value.format' = 'raw',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.bootstrap.servers' = '${kafka.servers}',
'properties.group.id' = 'g-tss-ods-hudi'
);
create table ods_itp_tss_hudi (
type string,
country string,
source string,
brand string,
version string,
vin string,
updated_date string,
etl_date string,
data string,
recv_time timestamp(3),
kafka_partition int,
kafka_offset bigint,
kafka_ts timestamp(3),
sync_date string
) partitioned by (sync_date, type)
with (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE',
'path' = '/hudi/ods_vgc/ods_vgc_itp_tss_rli',
'write.tasks' = '1',
'write.operation' = 'insert',
'write.task.max.size' = '512'
'clustering.schedule.enabled' = 'true',
'clustering.async.enabled' = 'true',
'clustering.delta_commits' = '6',
'clean.async.enabled'='false',
'clean.retain_commits'='1'
);
insert into ods_itp_tss_hudi
(type, country, source, brand, version, vin, updated_date, etl_date, data,
recv_time, kafka_partition, kafka_offset, kafka_ts, sync_date)
select
tss.type,
tss.country,
tss.source,
tss.brand,
tss.version,
tss.vin,
tss.updated_date,
tss.etl_date,
tss.data,
to_timestamp(convert_tz(date_format(LOCALTIMESTAMP, 'yyyy-MM-dd
HH:mm:ss'), 'GMT+08:00', 'UTC')) as recv_time,
kafka_partition,
kafka_offset,
kafka_ts,
sync_date
from (
select
decompress_tss(data) as tss,
to_timestamp(convert_tz(date_format(ts, 'yyyy-MM-dd HH:mm:ss'),
'GMT+08:00', 'UTC')) as kafka_ts,
`partition` as kafka_partition,
`offset` as kafka_offset,
date_format(ts, 'yyyyMMdd') as sync_date
from tss_odp_kafka_source
);
Then I started a new flink program B to streaming read the table
ods_itp_tss_hudi created by program A,but i encounted the cause:
2024-04-19 03:45:27,925 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
TSS[1] -> Calc[2] -> ConstraintEnforcer[3] -> TableToDataSteam -> Filter -> Map
-> Filter -> Map -> Sink: StdStreamToClickHouse (2/10)
(87754f6aa528b72165577ce077304447_cbc357ccb763df2852fee8c4fc7d55f2_1_1)
switched from RUNNING to FAILED on container_e70_1702372395802_74102_01_000010
@ 10.91.144.87 (dataPort=37053).
java.io.FileNotFoundException: File does not exist:
hdfs://HDFS8000151/hudi/ods_vgc/ods_vgc_itp_tss_rli/20240418/combi/7be53036-cdd9-4388-bd53-ce35edd303c3-0_0-1-0_20240419025522427.parquet
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587)
~[hadoop-hdfs-client-3.1.2.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580)
~[hadoop-hdfs-client-3.1.2.jar:?]
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[hadoop-common-3.1.2.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595)
~[hadoop-hdfs-client-3.1.2.jar:?]
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:469)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:130)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:148)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:56)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:132)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66)
~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1]
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
~[flink-dist-1.16.0.jar:1.16.0
The program A has configued the parameter 'clustering.schedule.enabled' =
'true' to cluster the parquet file asynchronously.
And parquets would be clusted until by 600Mb. When my program B was reading
the parquet which was been clustered ,then I would met the file not exists
exception.
Anyone who can help me?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]