[
https://issues.apache.org/jira/browse/FLINK-35698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruan Hang updated FLINK-35698:
------------------------------
Fix Version/s: 2.3.0
(was: 2.2.0)
> Parquet connector fails to load ROW<x decimal(5, 2)> after save
> ---------------------------------------------------------------
>
> Key: FLINK-35698
> URL: https://issues.apache.org/jira/browse/FLINK-35698
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / API
> Affects Versions: 2.1.0
> Reporter: Andrey Gaskov
> Priority: Critical
> Fix For: 2.3.0
>
> Attachments: flink-parquet-testing.png
>
>
> The bug could be reproduced by the following test added to
> ParquetFileSystemITCase.java:
> {code:java}
> @TestTemplate
> void testRowColumnType() throws IOException, ExecutionException,
> InterruptedException {
> String path =
> TempDirUtils.newFolder(super.fileTempFolder()).toURI().getPath();
> super.tableEnv()
> .executeSql(
> "create table t_in("
> + "grp ROW<x decimal(5, 2)>"
> + ") with ("
> + "'connector' = 'datagen',"
> + "'number-of-rows' = '10'"
> + ")");
> super.tableEnv()
> .executeSql(
> "create table t_out("
> + "grp ROW<x decimal(5, 2)>"
> + ") with ("
> + "'connector' = 'filesystem',"
> + "'path' = '"
> + path
> + "',"
> + "'format' = 'parquet'"
> + ")");
> super.tableEnv().executeSql("insert into t_out select * from
> t_in").await();
> List<Row> rows =
> CollectionUtil.iteratorToList(
> super.tableEnv().executeSql("select * from t_out limit
> 10").collect());
> assertThat(rows).hasSize(10);
> } {code}
> It fails with this root exception after hanging for 40 seconds:
> {code:java}
> Caused by: java.lang.ClassCastException:
> org.apache.flink.table.data.columnar.vector.heap.HeapIntVector cannot be cast
> to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
> at
> org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
> at
> org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
> at
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
> at
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRow(AbstractBinaryWriter.java:147)
> at
> org.apache.flink.table.data.writer.BinaryRowWriter.writeRow(BinaryRowWriter.java:27)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:155)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
> at
> org.apache.flink.table.runtime.operators.sort.LimitOperator.processElement(LimitOperator.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:103)
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750) {code}
> If "grp ROW<x decimal(5, 2)>" is changed to "grp ROW<x int>", the test runs
> successuflly in few seconds.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)