Yao Zhang created HUDI-7303:
-------------------------------
Summary: Date field type unexpectedly convert to Long when using
date comparison operator
Key: HUDI-7303
URL: https://issues.apache.org/jira/browse/HUDI-7303
Project: Apache Hudi
Issue Type: Bug
Components: flink
Affects Versions: 0.14.1, 0.14.0
Environment: Flink 1.15.4 Hudi 0.14.0
Flink 1.17.1 Hudi 0.14.0
Flink 1.17.1 Hudi 0.14.1rc1
Reporter: Yao Zhang
Assignee: Yao Zhang
Given the table date_dim from TPCDS as an example:
{code:java}
CREATE TABLE date_dim (
d_date_sk int,
d_date_id varchar(16) NOT NULL,
d_date date,
d_month_seq int,
d_week_seq int,
d_quarter_seq int,
d_year int,
d_dow int,
d_moy int,
d_dom int,
d_qoy int,
d_fy_year int,
d_fy_quarter_seq int,
d_fy_week_seq int,
d_day_name varchar(9)
d_quarter_name varchar(6),
d_holiday char(1),
d_weekend char(1),
d_following_holiday char(1),
d_first_dom int,
d_last_dom int,
d_same_day_ly int,
d_same_day_lq int,
d_current_day char(1),
d_current_week char(1),
d_current_month char(1),
d_current_quarter char(1),
d_current_year char(1)) with (
'connector' = 'hudi',
'path' = 'hdfs:///table_path/date_dim',
'table.type' = 'COPY_ON_WRITE'); {code}
When you execute the following select statement, an exception will be thrown:
{code:java}
select * from date_dim where d_date between cast('1999-02-22' as date) and
(cast('1999-02-22' as date) + INTERVAL '30' day);
{code}
The exception is:
{code:java}
java.lang.IllegalArgumentException: FilterPredicate column: d_date's declared
type (java.lang.Long) does not match the schema found in file metadata. Column
d_date is of type: INT32
Valid types for this column are: [class java.lang.Integer]
at
org.apache.parquet.filter2.predicate.ValidTypeMap.assertTypeValid(ValidTypeMap.java:125)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:179)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:113)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.Operators$GtEq.accept(Operators.java:246)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:119)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:306)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:67)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:142)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:153)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:78)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:130)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66)
~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
~[flink-dist-1.17.1.jar:1.17.1]
{code}
I tested several combinations of Flink and Hudi, the results are as followed:
Flink 1.15.4 Hudi 0.13.1: OK
Flink 1.15.4 Hudi 0.14.0: Fail
Flink 1.17.1 Hudi 0.14.0: Fail
Flink 1.17.1 Hudi 0.14.1rc1: Fail
This issue only occurs with Hudi 0.14.x
--
This message was sent by Atlassian Jira
(v8.20.10#820010)