Wenning Ding created HUDI-2146:
----------------------------------

             Summary: Concurrent writes loss data 
                 Key: HUDI-2146
                 URL: https://issues.apache.org/jira/browse/HUDI-2146
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Wenning Ding
         Attachments: image-2021-07-08-00-49-30-730.png

Reproduction steps:

Create a Hudi table:
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
import org.apache.hudi.AvroConversionUtils

val df = Seq(
  (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
  (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
  (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
  (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_type")

var tableName = "hudi_test"
var tablePath = "s3://.../" + tableName

// write hudi dataset
df.write.format("org.apache.hudi")
  .option(HoodieWriteConfig.TABLE_NAME, tableName)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
  .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
  .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
  .mode(SaveMode.Overwrite)
  .save(tablePath)
{code}
Perform two insert operations almost in the same time, each insertion contains 
different data:

Insert 1:
{code:java}
val df3 = Seq(
  (400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"),
  (401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"),
  (404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"),
  (405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_type")

// update hudi dataset
df3.write.format("org.apache.hudi")
   .option(HoodieWriteConfig.TABLE_NAME, tableName)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
   .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
   .option("hoodie.cleaner.policy.failed.writes", "LAZY")
   .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
   .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
   .option("hoodie.write.lock.zookeeper.port", "2181")
   .option("hoodie.write.lock.zookeeper.lock_key", tableName)
   .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
   .mode(SaveMode.Append)
   .save(tablePath)
{code}
Insert 2:
{code:java}
val df3 = Seq(
  (300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"),
  (301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"),
  (304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"),
  (305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_type")

// update hudi dataset
df3.write.format("org.apache.hudi")
   .option(HoodieWriteConfig.TABLE_NAME, tableName)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
   .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
   .option("hoodie.cleaner.policy.failed.writes", "LAZY")
   .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
   .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
   .option("hoodie.write.lock.zookeeper.port", "2181")
   .option("hoodie.write.lock.zookeeper.lock_key", tableName)
   .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
   .mode(SaveMode.Append)
   .save(tablePath)
{code}
There's no exception/rollback during the insertions, however, when I check the 
Hudi table data, it only contains one insert and the other one is missing.

Here is the timeline:

!image-2021-07-08-00-49-30-730.png|width=840,height=322!

I checked the parquet file at 20210706171250, it contains all the insertion 
data. However, in the parquet file of 20210706171252, it only contains one 
insert data. Also you can see, though 20210706171252 is the latest timestamp, 
the commit happens before 20210706171250. 

So I guess here is the process:

insert 1 get timestamp (20210706171250) -> insert 2 get timestamp 
(20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit 
insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi 
table (20210706171250)

However, when querying Hudi table, it would return the data from the latest 
timestamp which is 20210706171252, so it would only return the data from insert 
2.

We need to check/update the timestamp somewhere before committing.

 

 

 

 

the 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to