[
https://issues.apache.org/jira/browse/FLINK-28552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jane Chan updated FLINK-28552:
------------------------------
Description:
Currently, changelog mode cannot support map and multiset as the field type.
More specifically,
* MULTISET is not supported at all, including append-only mode. (
Caused by: java.lang.UnsupportedOperationException: Unsupported type:
MULTISET<TINYINT>
at
org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:214)
at
org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:210)
at
org.apache.flink.table.store.format.orc.OrcFileFormat.createWriterFactory(OrcFileFormat.java:94)
at
org.apache.flink.table.store.file.data.AppendOnlyWriter$RowRollingWriter.lambda$createRollingRowWriter$0(AppendOnlyWriter.java:229)
at
org.apache.flink.table.store.file.writer.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:73)
at
org.apache.flink.table.store.file.writer.RollingFileWriter.write(RollingFileWriter.java:61)
at
org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:108)
at
org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:56)
at
org.apache.flink.table.store.table.AppendOnlyFileStoreTable$3.writeSinkRecord(AppendOnlyFileStoreTable.java:119)
at
org.apache.flink.table.store.table.sink.AbstractTableWrite.write(AbstractTableWrite.java:76)
at
org.apache.flink.table.store.connector.sink.StoreWriteOperator.processElement(StoreWriteOperator.java:124)
... 13 more)
* MAP cannot be pk for key-value mode, and cannot be the fields for
value-count mode.
Stacktrace
java.lang.UnsupportedOperationException
at
org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139)
at
org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
at
org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263)
at
org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45)
at
org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala)
at
org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53)
at
org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66)
at
org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40)
at
org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59)
at
org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73)
at
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70)
at
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50)
at
org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58)
at
org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93)
was:
Currently, changelog mode cannot support map and multiset as the field type.
More specifically, they cannot be pk for key-value mode, and cannot be the
fields for value-count mode.
Stacktrace
java.lang.UnsupportedOperationException
at
org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139)
at
org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
at
org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263)
at
org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45)
at
org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala)
at
org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53)
at
org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66)
at
org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40)
at
org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59)
at
org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73)
at
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70)
at
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50)
at
org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58)
at
org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93)
> GenerateUtils#generateCompare supports MULTISET and MAP
> -------------------------------------------------------
>
> Key: FLINK-28552
> URL: https://issues.apache.org/jira/browse/FLINK-28552
> Project: Flink
> Issue Type: Improvement
> Components: Table Store
> Affects Versions: table-store-0.2.0
> Reporter: Jane Chan
> Priority: Minor
> Fix For: table-store-0.2.0
>
>
> Currently, changelog mode cannot support map and multiset as the field type.
> More specifically,
> * MULTISET is not supported at all, including append-only mode. (
> Caused by: java.lang.UnsupportedOperationException: Unsupported type:
> MULTISET<TINYINT>
> at
> org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:214)
> at
> org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:210)
> at
> org.apache.flink.table.store.format.orc.OrcFileFormat.createWriterFactory(OrcFileFormat.java:94)
> at
> org.apache.flink.table.store.file.data.AppendOnlyWriter$RowRollingWriter.lambda$createRollingRowWriter$0(AppendOnlyWriter.java:229)
> at
> org.apache.flink.table.store.file.writer.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:73)
> at
> org.apache.flink.table.store.file.writer.RollingFileWriter.write(RollingFileWriter.java:61)
> at
> org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:108)
> at
> org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:56)
> at
> org.apache.flink.table.store.table.AppendOnlyFileStoreTable$3.writeSinkRecord(AppendOnlyFileStoreTable.java:119)
> at
> org.apache.flink.table.store.table.sink.AbstractTableWrite.write(AbstractTableWrite.java:76)
> at
> org.apache.flink.table.store.connector.sink.StoreWriteOperator.processElement(StoreWriteOperator.java:124)
> ... 13 more)
> * MAP cannot be pk for key-value mode, and cannot be the fields for
> value-count mode.
>
> Stacktrace
> java.lang.UnsupportedOperationException
> at
> org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139)
> at
> org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
> at
> org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263)
> at
> org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45)
> at
> org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala)
> at
> org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53)
> at
> org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66)
> at
> org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40)
> at
> org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59)
> at
> org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73)
> at
> org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70)
> at
> org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50)
> at
> org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58)
> at
> org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)