Zongwen Li created FLINK-24726:
----------------------------------
Summary: Hive Lookup Join SQL: decimal type
UnsupportedOperationException
Key: FLINK-24726
URL: https://issues.apache.org/jira/browse/FLINK-24726
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
SQL / Runtime
Affects Versions: 1.13.3, 1.14.0
Reporter: Zongwen Li
exception trace:
{code:java}
java.lang.UnsupportedOperationException:
org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionaryjava.lang.UnsupportedOperationException:
org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
at
org.apache.flink.hive.shaded.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
at
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDictionary.decodeToLong(ParquetDictionary.java:42)
at
org.apache.flink.table.data.vector.heap.HeapLongVector.getLong(HeapLongVector.java:47)
at
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDecimalVector.getDecimal(ParquetDecimalVector.java:48)
at
org.apache.flink.table.data.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:116)
at
org.apache.flink.table.data.ColumnarRowData.getDecimal(ColumnarRowData.java:119)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138)
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
at LookupFunction$159.flatMap(Unknown Source) at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$913.processElement_split139(Unknown Source) at
StreamExecCalc$913.processElement(Unknown Source) at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at
java.lang.Thread.run(Thread.java:748)
{code}
hive primitive type:
fixed_len_byte_array(5) list_price (DECIMAL(10,2));
This type of lookup join operation will cause an exception;
root error:
{code:java}
//org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader#readToVector
public final void readToVector(int readNumber, VECTOR vector) throws
IOException {
int rowId = 0;
WritableIntVector dictionaryIds = null;
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
while (readNumber > 0) {
// ......
if (isCurrentPageDictionaryEncoded) {
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel,
this.dictionaryIdsDecoder);
if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// The problem is here:
// PlainBinaryDictionary has been added to HeapLongVector
vector.setDictionary(new ParquetDictionary(dictionary));
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
}
} else {
if (vector.hasDictionary() && rowId != 0) {
readBatchFromDictionaryIds(0, rowId, vector,
vector.getDictionaryIds());
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
}
valuesRead += num;
rowId += num;
readNumber -= num;
}
}
{code}
Because the ??#readBatch?? and ??#readBatchFromDictionaryIds?? of
??org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader
??were not called, an exception was thrown.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)