[
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yun Tang reassigned FLINK-33934:
--------------------------------
Assignee: Yuan Kui
> Flink SQL Source use raw format maybe lead to data lost
> -------------------------------------------------------
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Runtime
> Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0,
> 1.19.0
> Reporter: Cai Liuyang
> Assignee: Yuan Kui
> Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info:
> 1. using flinkSQL that read data from messageQueue (our internal mq) and
> write to hive (only select value field, doesn't contain metadata field)
> 2. the format of source table is raw format
>
> But if we select value field and metadata field at the same time, than the
> data lost will not appear
>
> After we review the code, we found that the reason is the object reuse of
> Raw-format(see code
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
> why object reuse will lead to this problem is below (take kafka as example):
> 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka
> partition, than put data to ElementQueue (see code [SourceOperator
> FetcherTask
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
> 2. SourceOperator's main thread will pull data from the
> ElementQueue(which is shared with the FetcherThread) and process it (see code
> [SourceOperator main
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
> 3. For RawFormatDeserializationSchema, its deserialize function will
> return the same object([reuse rowData
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
> 4. So, if elementQueue have element that not be consumed, than the
> fetcherThread can change the filed of the reused rawData that
> RawFormatDeserializationSchema::deserialize returned, this will lead to data
> lost;
>
> The reason that we select value and metadata field at the same time will not
> encounter data lost is:
> if we select metadata field there will return a new RowData object see
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
> and if we only select value filed, it will reuse the RowData object that
> formatDeserializationSchema returned see code
> [DynamicKafkaDeserializationSchema deserialize only with value
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>
> To solve this problem, i think we should remove reuse object of
> RawFormatDeserializationSchema.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)