Wenning Ding created HUDI-2146:
----------------------------------
Summary: Concurrent writes loss data
Key: HUDI-2146
URL: https://issues.apache.org/jira/browse/HUDI-2146
Project: Apache Hudi
Issue Type: Bug
Reporter: Wenning Ding
Attachments: image-2021-07-08-00-49-30-730.png
Reproduction steps:
Create a Hudi table:
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
import org.apache.hudi.AvroConversionUtils
val df = Seq(
(100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
(101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
(104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
(105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
).toDF("event_id", "event_name", "event_ts", "event_type")
var tableName = "hudi_test"
var tablePath = "s3://.../" + tableName
// write hudi dataset
df.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Overwrite)
.save(tablePath)
{code}
Perform two insert operations almost in the same time, each insertion contains
different data:
Insert 1:
{code:java}
val df3 = Seq(
(400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"),
(401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"),
(404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"),
(405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2")
).toDF("event_id", "event_name", "event_ts", "event_type")
// update hudi dataset
df3.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
.option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", tableName)
.option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
.mode(SaveMode.Append)
.save(tablePath)
{code}
Insert 2:
{code:java}
val df3 = Seq(
(300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"),
(301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"),
(304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"),
(305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2")
).toDF("event_id", "event_name", "event_ts", "event_type")
// update hudi dataset
df3.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
.option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", tableName)
.option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
.mode(SaveMode.Append)
.save(tablePath)
{code}
There's no exception/rollback during the insertions, however, when I check the
Hudi table data, it only contains one insert and the other one is missing.
Here is the timeline:
!image-2021-07-08-00-49-30-730.png|width=840,height=322!
I checked the parquet file at 20210706171250, it contains all the insertion
data. However, in the parquet file of 20210706171252, it only contains one
insert data. Also you can see, though 20210706171252 is the latest timestamp,
the commit happens before 20210706171250.
So I guess here is the process:
insert 1 get timestamp (20210706171250) -> insert 2 get timestamp
(20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit
insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi
table (20210706171250)
However, when querying Hudi table, it would return the data from the latest
timestamp which is 20210706171252, so it would only return the data from insert
2.
We need to check/update the timestamp somewhere before committing.
the
--
This message was sent by Atlassian Jira
(v8.3.4#803005)