[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-33934:
--------------------------------
    Description: 
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse rowData 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.

  was:
In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
    3. For RawFormatDeserializationSchema, its deserialize function will return 
the same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
    4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.


> Flink SQL Source use raw format maybe lead to data lost
> -------------------------------------------------------
>
>                 Key: FLINK-33934
>                 URL: https://issues.apache.org/jira/browse/FLINK-33934
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>            Reporter: Cai Liuyang
>            Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue and write to hive (only 
> select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should diable reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to